You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by sy...@apache.org on 2022/05/17 10:06:52 UTC

[zookeeper] branch branch-3.5 updated (af44dabd4 -> a5f0eb0e5)

This is an automated email from the ASF dual-hosted git repository.

symat pushed a change to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


    from af44dabd4 ZOOKEEPER-3706: ZooKeeper.close() would leak SendThread when the netw…
     new 67285ce5b ZOOKEEPER-3161: Refactor QuorumPeerMainTest.java: move commonly used functions to base class
     new a5f0eb0e5 ZOOKEEPER-4269: acceptedEpoch.tmp rename failure will cause server startup error

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../zookeeper/common/AtomicFileOutputStream.java   |   3 +-
 .../apache/zookeeper/server/quorum/QuorumPeer.java |  14 +-
 .../quorum/CurrentEpochWriteFailureTest.java       | 117 ++++++++++++++++
 .../server/quorum/QuorumPeerMainTest.java          | 147 +--------------------
 .../server/quorum/QuorumPeerTestBase.java          | 144 ++++++++++++++++++++
 5 files changed, 277 insertions(+), 148 deletions(-)
 create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CurrentEpochWriteFailureTest.java


[zookeeper] 02/02: ZOOKEEPER-4269: acceptedEpoch.tmp rename failure will cause server startup error

Posted by sy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

symat pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/zookeeper.git

commit a5f0eb0e5303426f9d7e3b0062e8d396ac9d98e5
Author: Mohammad Arshad <ar...@apache.org>
AuthorDate: Thu Apr 1 02:54:37 2021 +0530

    ZOOKEEPER-4269: acceptedEpoch.tmp rename failure will cause server startup error
    
    Using accepted epoch from acceptedEpoch.tmp if it is available
    
    Author: Mohammad Arshad <ar...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@apache.org>,Damien Diederen <dd...@crosstwine.com>
    
    Closes #1668 from arshadmohammad/ZOOKEEPER-4269-branch-3.6
    
    (cherry picked from commit 54e563bfe13508fc3707d45e47d37e0c201f19ed)
---
 .../zookeeper/common/AtomicFileOutputStream.java   |   3 +-
 .../apache/zookeeper/server/quorum/QuorumPeer.java |  14 ++-
 .../quorum/CurrentEpochWriteFailureTest.java       | 117 +++++++++++++++++++++
 3 files changed, 132 insertions(+), 2 deletions(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/AtomicFileOutputStream.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/AtomicFileOutputStream.java
index 740ae8f67..35f3379ca 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/common/AtomicFileOutputStream.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/AtomicFileOutputStream.java
@@ -44,7 +44,8 @@ import org.slf4j.LoggerFactory;
  * place.
  */
 public class AtomicFileOutputStream extends FilterOutputStream {
-    private static final String TMP_EXTENSION = ".tmp";
+
+    public static final String TMP_EXTENSION = ".tmp";
 
     private final static Logger LOG = LoggerFactory
             .getLogger(AtomicFileOutputStream.class);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index daf605cab..54128d4ff 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.security.sasl.SaslException;
 
 import org.apache.zookeeper.KeeperException.BadArgumentsException;
+import org.apache.zookeeper.common.AtomicFileOutputStream;
 import org.apache.zookeeper.common.AtomicFileWritingIdiom;
 import org.apache.zookeeper.common.AtomicFileWritingIdiom.WriterStatement;
 import org.apache.zookeeper.common.QuorumX509Util;
@@ -976,7 +977,18 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
             	writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
             }
             if (epochOfZxid > currentEpoch) {
-                throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
+                // acceptedEpoch.tmp file in snapshot directory
+                File currentTmp = new File(getTxnFactory().getSnapDir(),
+                    CURRENT_EPOCH_FILENAME + AtomicFileOutputStream.TMP_EXTENSION);
+                if (currentTmp.exists()) {
+                    long epochOfTmp = readLongFromFile(currentTmp.getName());
+                    LOG.info("{} found. Setting current epoch to {}.", currentTmp, epochOfTmp);
+                    setCurrentEpoch(epochOfTmp);
+                } else {
+                    throw new IOException(
+                        "The current epoch, " + ZxidUtils.zxidToString(currentEpoch)
+                            + ", is older than the last zxid, " + lastProcessedZxid);
+                }
             }
             try {
                 acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CurrentEpochWriteFailureTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CurrentEpochWriteFailureTest.java
new file mode 100644
index 000000000..9a172eee1
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CurrentEpochWriteFailureTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.AtomicFileOutputStream;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CurrentEpochWriteFailureTest extends QuorumPeerTestBase {
+    protected static final Logger LOG = LoggerFactory.getLogger(CurrentEpochWriteFailureTest.class);
+    private Servers servers;
+    private int clientPort;
+
+    @After
+    public void tearDown() throws InterruptedException {
+        if (servers != null) {
+            servers.shutDownAllServers();
+        }
+    }
+
+    /*
+     * ZOOKEEPER-4269:
+     * accepted epoch is first written to temporary file acceptedEpoch.tmp then this file is
+     * renamed to acceptedEpoch.
+     * Failure, either because of exception or power-off, in renaming the acceptedEpoch.tmp file
+     * will cause server startup error with message "The current epoch, x, is older than the last
+     * zxid y"
+     * To handle this scenario we should read accepted epoch from this temp file as well.
+     */
+    @Test
+    public void testReadCurrentEpochFromAcceptedEpochTmpFile() throws Exception {
+        startServers();
+        writeSomeData();
+
+        restartServers();
+        writeSomeData();
+
+        MainThread firstServer = servers.mt[0];
+
+        // As started servers two times, current epoch must be two
+        long currentEpoch = firstServer.getQuorumPeer().getCurrentEpoch();
+        assertEquals(2, currentEpoch);
+
+        // Initialize files for later use
+        File snapDir = firstServer.getQuorumPeer().getTxnFactory().getSnapDir();
+        File currentEpochFile = new File(snapDir, QuorumPeer.CURRENT_EPOCH_FILENAME);
+        File currentEpochTempFile = new File(snapDir,
+            QuorumPeer.CURRENT_EPOCH_FILENAME + AtomicFileOutputStream.TMP_EXTENSION);
+
+        // Shutdown servers
+        servers.shutDownAllServers();
+        waitForAll(servers, ZooKeeper.States.CONNECTING);
+
+        // Create scenario of file currentEpoch.tmp rename to currentEpoch failure.
+        // In this case currentEpoch file will have old epoch and currentEpoch.tmp will have the latest epoch
+        FileUtils.write(currentEpochFile, Long.toString(currentEpoch - 1), "UTF-8");
+        FileUtils.write(currentEpochTempFile, Long.toString(currentEpoch), "UTF-8");
+
+        // Restart the serves, all serves should restart successfully.
+        servers.restartAllServersAndClients(this);
+
+        // Check the first server where problem was injected.
+        assertTrue("server " + firstServer.getMyid()
+            + " is not up as file currentEpoch.tmp rename to currentEpoch file was failed"
+            + " which lead current epoch inconsistent state.", ClientBase
+            .waitForServerUp("127.0.0.1:" + firstServer.getClientPort(), CONNECTION_TIMEOUT));
+    }
+
+    private void restartServers() throws InterruptedException, IOException {
+        servers.shutDownAllServers();
+        waitForAll(servers, ZooKeeper.States.CONNECTING);
+        servers.restartAllServersAndClients(this);
+        waitForAll(servers, ZooKeeper.States.CONNECTED);
+    }
+
+    private void writeSomeData() throws Exception {
+        ZooKeeper client = ClientBase.createZKClient("127.0.0.1:" + clientPort);
+        String path = "/somePath" + System.currentTimeMillis();
+        String data = "someData";
+        client.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        byte[] data1 = client.getData(path, false, null);
+        assertEquals(data, new String(data1));
+        client.close();
+    }
+
+    private void startServers() throws Exception {
+        servers = LaunchServers(3);
+        clientPort = servers.clientPorts[0];
+    }
+}


[zookeeper] 01/02: ZOOKEEPER-3161: Refactor QuorumPeerMainTest.java: move commonly used functions to base class

Posted by sy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

symat pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/zookeeper.git

commit 67285ce5bb209eb926b2850a0c37e90e1db13382
Author: Andor Molnar <an...@apache.org>
AuthorDate: Fri Oct 12 10:25:27 2018 +0200

    ZOOKEEPER-3161: Refactor QuorumPeerMainTest.java: move commonly used functions to base class
    
    Move the following methods to QuorumPeerTestBase.java:
    
    - tearDown()
    - LaunchServers()
    - waitForOne(), waitForAll()
    - logStates()
    
    Author: Andor Molnar <an...@apache.org>
    
    Reviewers: andor@apache.org
    
    Closes #659 from anmolnar/ZOOKEEPER-3161
    
    (cherry picked from commit ee250f141678f79e9517bfc8913956199fad55bb)
---
 .../server/quorum/QuorumPeerMainTest.java          | 147 +--------------------
 .../server/quorum/QuorumPeerTestBase.java          | 144 ++++++++++++++++++++
 2 files changed, 145 insertions(+), 146 deletions(-)

diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index a1eeaa3ac..aef5bd417 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -35,7 +35,6 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.nio.file.Paths;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -52,7 +51,6 @@ import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.PortAssignment;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -63,7 +61,6 @@ import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.test.ClientBase;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -73,22 +70,6 @@ import org.junit.Test;
  */
 public class QuorumPeerMainTest extends QuorumPeerTestBase {
 
-    private Servers servers;
-    private int numServers = 0;
-
-    @After
-    public void tearDown() throws Exception {
-        if (servers == null || servers.mt == null) {
-            LOG.info("No servers to shutdown!");
-            return;
-        }
-        for (int i = 0; i < numServers; i++) {
-            if (i < servers.mt.length) {
-                servers.mt[i].shutdown();
-            }
-        }
-    }
-
     /**
      * Verify the ability to start a cluster.
      */
@@ -454,132 +435,6 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
         Assert.assertTrue("falseLeader never rejoins the quorum", foundFollowing);
     }
 
-    public static void waitForOne(ZooKeeper zk, States state) throws InterruptedException {
-        int iterations = ClientBase.CONNECTION_TIMEOUT / 500;
-        while (zk.getState() != state) {
-            if (iterations-- == 0) {
-                throw new RuntimeException("Waiting too long " + zk.getState() + " != " + state);
-            }
-            Thread.sleep(500);
-        }
-    }
-
-    private void waitForAll(Servers servers, States state) throws InterruptedException {
-        waitForAll(servers.zk, state);
-    }
-
-    public static void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException {
-        int iterations = ClientBase.CONNECTION_TIMEOUT / 1000;
-        boolean someoneNotConnected = true;
-        while (someoneNotConnected) {
-            if (iterations-- == 0) {
-                logStates(zks);
-                ClientBase.logAllStackTraces();
-                throw new RuntimeException("Waiting too long");
-            }
-
-            someoneNotConnected = false;
-            for (ZooKeeper zk : zks) {
-                if (zk.getState() != state) {
-                    someoneNotConnected = true;
-                    break;
-                }
-            }
-            Thread.sleep(1000);
-        }
-    }
-
-    public static void logStates(ZooKeeper[] zks) {
-            StringBuilder sbBuilder = new StringBuilder("Connection States: {");
-           for (int i = 0; i < zks.length; i++) {
-                sbBuilder.append(i + " : " + zks[i].getState() + ", ");
-           }
-            sbBuilder.append('}');
-            LOG.error(sbBuilder.toString());
-    }
-
-    // This class holds the servers and clients for those servers
-    private static class Servers {
-        MainThread mt[];
-        ZooKeeper zk[];
-        int[] clientPorts;
-
-        public void shutDownAllServers() throws InterruptedException {
-            for (MainThread t: mt) {
-                t.shutdown();
-            }
-        }
-
-        public void restartAllServersAndClients(Watcher watcher) throws IOException, InterruptedException {
-            for (MainThread t : mt) {
-                if (!t.isAlive()) {
-                    t.start();
-                }
-            }
-            for (int i = 0; i < zk.length; i++) {
-                restartClient(i, watcher);
-            }
-        }
-
-        public void restartClient(int clientIndex, Watcher watcher) throws IOException, InterruptedException {
-            if (zk[clientIndex] != null) {
-                zk[clientIndex].close();
-            }
-            zk[clientIndex] = new ZooKeeper("127.0.0.1:" + clientPorts[clientIndex], ClientBase.CONNECTION_TIMEOUT, watcher);
-        }
-
-        public int findLeader() {
-            for (int i = 0; i < mt.length; i++) {
-                if (mt[i].main.quorumPeer.leader != null) {
-                    return i;
-                }
-            }
-            return -1;
-        }
-    }
-
-
-    private Servers LaunchServers(int numServers) throws IOException, InterruptedException {
-        return LaunchServers(numServers, null);
-    }
-
-    /**
-     * This is a helper function for launching a set of servers
-     *
-     * @param numServers the number of servers
-     * @param tickTime A ticktime to pass to MainThread
-     * @return
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    private Servers LaunchServers(int numServers, Integer tickTime) throws IOException, InterruptedException {
-        int SERVER_COUNT = numServers;
-        Servers svrs = new Servers();
-        svrs.clientPorts = new int[SERVER_COUNT];
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < SERVER_COUNT; i++) {
-            svrs.clientPorts[i] = PortAssignment.unique();
-            sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+svrs.clientPorts[i]+"\n");
-        }
-        String quorumCfgSection = sb.toString();
-
-        svrs.mt = new MainThread[SERVER_COUNT];
-        svrs.zk = new ZooKeeper[SERVER_COUNT];
-        for(int i = 0; i < SERVER_COUNT; i++) {
-            if (tickTime != null) {
-                svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection, new HashMap<String, String>(), tickTime);
-            } else {
-                svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection);
-            }
-            svrs.mt[i].start();
-            svrs.restartClient(i, this);
-        }
-
-        waitForAll(svrs, States.CONNECTED);
-
-        return svrs;
-    }
-
     /**
      * Verify handling of bad quorum address
      */
@@ -1216,7 +1071,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
             }
         }
     }
-    
+
     private Proposal findProposalOfType(Map<Long, Proposal> proposals, int type) {
         for (Proposal proposal : proposals.values()) {
             if (proposal.request.getHdr().getType() == type) {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
index ffc00f39a..22f989314 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
@@ -32,6 +32,9 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.Properties;
 
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.WatchedEvent;
@@ -53,6 +56,22 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
 
     public static final int TIMEOUT = 5000;
 
+    protected Servers servers;
+    protected int numServers = 0;
+
+    @After
+    public void tearDown() throws Exception {
+        if (servers == null || servers.mt == null) {
+            LOG.info("No servers to shutdown!");
+            return;
+        }
+        for (int i = 0; i < numServers; i++) {
+            if (i < servers.mt.length) {
+                servers.mt[i].shutdown();
+            }
+        }
+    }
+
     public void process(WatchedEvent event) {
         // ignore for this test
     }
@@ -390,4 +409,129 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
         }
 
     }
+
+    // This class holds the servers and clients for those servers
+    protected static class Servers {
+        MainThread mt[];
+        ZooKeeper zk[];
+        int[] clientPorts;
+
+        public void shutDownAllServers() throws InterruptedException {
+            for (MainThread t: mt) {
+                t.shutdown();
+            }
+        }
+
+        public void restartAllServersAndClients(Watcher watcher) throws IOException, InterruptedException {
+            for (MainThread t : mt) {
+                if (!t.isAlive()) {
+                    t.start();
+                }
+            }
+            for (int i = 0; i < zk.length; i++) {
+                restartClient(i, watcher);
+            }
+        }
+
+        public void restartClient(int clientIndex, Watcher watcher) throws IOException, InterruptedException {
+            if (zk[clientIndex] != null) {
+                zk[clientIndex].close();
+            }
+            zk[clientIndex] = new ZooKeeper("127.0.0.1:" + clientPorts[clientIndex], ClientBase.CONNECTION_TIMEOUT, watcher);
+        }
+
+        public int findLeader() {
+            for (int i = 0; i < mt.length; i++) {
+                if (mt[i].main.quorumPeer.leader != null) {
+                    return i;
+                }
+            }
+            return -1;
+        }
+    }
+
+    protected Servers LaunchServers(int numServers) throws IOException, InterruptedException {
+        return LaunchServers(numServers, null);
+    }
+
+    /** * This is a helper function for launching a set of servers
+     *
+     * @param numServers the number of servers
+     * @param tickTime A ticktime to pass to MainThread
+     * @return
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    protected Servers LaunchServers(int numServers, Integer tickTime) throws IOException, InterruptedException {
+        int SERVER_COUNT = numServers;
+        QuorumPeerMainTest.Servers svrs = new QuorumPeerMainTest.Servers();
+        svrs.clientPorts = new int[SERVER_COUNT];
+        StringBuilder sb = new StringBuilder();
+        for(int i = 0; i < SERVER_COUNT; i++) {
+            svrs.clientPorts[i] = PortAssignment.unique();
+            sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+svrs.clientPorts[i]+"\n");
+        }
+        String quorumCfgSection = sb.toString();
+
+        svrs.mt = new MainThread[SERVER_COUNT];
+        svrs.zk = new ZooKeeper[SERVER_COUNT];
+        for(int i = 0; i < SERVER_COUNT; i++) {
+            if (tickTime != null) {
+                svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection, new HashMap<String, String>(), tickTime);
+            } else {
+                svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection);
+            }
+            svrs.mt[i].start();
+            svrs.restartClient(i, this);
+        }
+
+        waitForAll(svrs, ZooKeeper.States.CONNECTED);
+
+        return svrs;
+    }
+
+    public static void waitForOne(ZooKeeper zk, ZooKeeper.States state) throws InterruptedException {
+        int iterations = ClientBase.CONNECTION_TIMEOUT / 500;
+        while (zk.getState() != state) {
+            if (iterations-- == 0) {
+                throw new RuntimeException("Waiting too long " + zk.getState() + " != " + state);
+            }
+            Thread.sleep(500);
+        }
+    }
+
+    protected void waitForAll(Servers servers, ZooKeeper.States state) throws InterruptedException {
+        waitForAll(servers.zk, state);
+    }
+
+    public static void waitForAll(ZooKeeper[] zks, ZooKeeper.States state) throws InterruptedException {
+        int iterations = ClientBase.CONNECTION_TIMEOUT / 1000;
+        boolean someoneNotConnected = true;
+        while (someoneNotConnected) {
+            if (iterations-- == 0) {
+                logStates(zks);
+                ClientBase.logAllStackTraces();
+                throw new RuntimeException("Waiting too long");
+            }
+
+            someoneNotConnected = false;
+            for (ZooKeeper zk : zks) {
+                if (zk.getState() != state) {
+                    someoneNotConnected = true;
+                    break;
+                }
+            }
+            Thread.sleep(1000);
+        }
+    }
+
+    public static void logStates(ZooKeeper[] zks) {
+        StringBuilder sbBuilder = new StringBuilder("Connection States: {");
+        for (int i = 0; i < zks.length; i++) {
+            sbBuilder.append(i + " : " + zks[i].getState() + ", ");
+        }
+        sbBuilder.append('}');
+        LOG.error(sbBuilder.toString());
+    }
+
 }