You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by dd...@apache.org on 2021/06/13 18:54:52 UTC

[zookeeper] branch master updated: ZOOKEEPER-4312: ZooKeeperServerEmbedded: enhance server start/stop for testability

This is an automated email from the ASF dual-hosted git repository.

ddiederen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e6e15a  ZOOKEEPER-4312: ZooKeeperServerEmbedded: enhance server start/stop for testability
5e6e15a is described below

commit 5e6e15ac40cfd2cadac3e718e494a3c13b934b8d
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Sun Jun 13 18:53:47 2021 +0000

    ZOOKEEPER-4312: ZooKeeperServerEmbedded: enhance server start/stop for testability
    
    see https://issues.apache.org/jira/browse/ZOOKEEPER-4312
    
    ZooKeeperServerEmbedded works well for running ZooKeeper but it lacks support for a few little features in order to use it for tests.
    
    I saw these problems while working on the port of Curator Testing Server to ZooKeeperServerEmbedded.
    
    There is no wait to wait for the server to be up-and-running
    When you "close()" the server, it does not wait for the ports to be closed
    There is no wait to have the ConnectString for the server
    
    Author: Enrico Olivelli <eo...@apache.org>
    
    Reviewers: Damien Diederen <dd...@apache.org>
    
    Closes #1710 from eolivelli/impl/embedded-2
---
 .../zookeeper/server/ZooKeeperServerMain.java      | 42 ++++++++++-----
 .../server/embedded/ZooKeeperServerEmbedded.java   | 16 ++++++
 .../embedded/ZooKeeperServerEmbeddedImpl.java      | 60 ++++++++++++++++++++--
 .../apache/zookeeper/server/quorum/QuorumPeer.java |  1 -
 .../ZookeeperServerClusterMutualAuthTest.java      |  6 +--
 .../embedded/ZookeeperServerClusterTest.java       |  6 +--
 .../embedded/ZookeeperServerEmbeddedTest.java      | 24 ++++++++-
 .../embedded/ZookeeperServerSslEmbeddedTest.java   |  4 +-
 8 files changed, 133 insertions(+), 26 deletions(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java
index c438f18..7bd30ba 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java
@@ -178,6 +178,8 @@ public class ZooKeeperServerMain {
             containerManager.start();
             ZKAuditProvider.addZKStartStopAuditLog();
 
+            serverStarted();
+
             // Watch status of ZooKeeper server. It will do a graceful shutdown
             // if the server is not running or hits an internal error.
             shutdownLatch.await();
@@ -247,21 +249,35 @@ public class ZooKeeperServerMain {
      */
     public void close() {
         ServerCnxnFactory primaryCnxnFactory = this.cnxnFactory;
-        if (primaryCnxnFactory == null) {
-            // in case of pure TLS we can hook into secureCnxnFactory
-            primaryCnxnFactory = secureCnxnFactory;
-        }
-        if (primaryCnxnFactory == null || primaryCnxnFactory.getZooKeeperServer() == null) {
-            return;
-        }
-        ZooKeeperServerShutdownHandler zkShutdownHandler = primaryCnxnFactory.getZooKeeperServer().getZkShutdownHandler();
-        zkShutdownHandler.handle(ZooKeeperServer.State.SHUTDOWN);
+        ServerCnxnFactory secondaryCnxnFactory = this.secureCnxnFactory;
         try {
-            // ServerCnxnFactory will call the shutdown
-            primaryCnxnFactory.join();
-        } catch (InterruptedException ex) {
-            Thread.currentThread().interrupt();
+            if (primaryCnxnFactory == null) {
+                // in case of pure TLS we can hook into secureCnxnFactory
+                primaryCnxnFactory = secondaryCnxnFactory;
+            }
+            if (primaryCnxnFactory == null || primaryCnxnFactory.getZooKeeperServer() == null) {
+                LOG.info("Connection factory did not start");
+                return;
+            }
+            ZooKeeperServerShutdownHandler zkShutdownHandler = primaryCnxnFactory.getZooKeeperServer().getZkShutdownHandler();
+            zkShutdownHandler.handle(ZooKeeperServer.State.SHUTDOWN);
+            try {
+                // ServerCnxnFactory will call the shutdown
+                primaryCnxnFactory.join();
+            } catch (InterruptedException ex) {
+                Thread.currentThread().interrupt();
+            }
+        } finally {
+            // ensure that we are closing the sockets
+            if (primaryCnxnFactory != null) {
+                primaryCnxnFactory.shutdown();
+            }
+            if (secondaryCnxnFactory != null) {
+                secondaryCnxnFactory.shutdown();
+            }
         }
     }
 
+    protected void serverStarted() {
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/embedded/ZooKeeperServerEmbedded.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/embedded/ZooKeeperServerEmbedded.java
index b9d0a30..a99bdb4 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/embedded/ZooKeeperServerEmbedded.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/embedded/ZooKeeperServerEmbedded.java
@@ -110,6 +110,22 @@ public interface ZooKeeperServerEmbedded extends AutoCloseable {
     void start() throws Exception;
 
     /**
+     * Start the server
+     * @param startupTimeout time to wait in millis for the server to start
+     * @throws Exception
+     */
+    void start(long startupTimeout) throws Exception;
+
+    /**
+     * Get a connection string useful for the client.
+     * @return the connection string
+     * @throws Exception in case the connection string is not available
+     */
+    String getConnectionString() throws Exception;
+
+    String getSecureConnectionString() throws Exception;
+
+    /**
      * Shutdown gracefully the server and wait for resources to be released.
      */
     @Override
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/embedded/ZooKeeperServerEmbeddedImpl.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/embedded/ZooKeeperServerEmbeddedImpl.java
index cfd3abf..ec6ae63 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/embedded/ZooKeeperServerEmbeddedImpl.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/embedded/ZooKeeperServerEmbeddedImpl.java
@@ -5,6 +5,10 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.security.sasl.SaslException;
 import org.apache.zookeeper.server.DatadirCleanupManager;
 import org.apache.zookeeper.server.ExitCode;
 import org.apache.zookeeper.server.ServerConfig;
@@ -72,6 +76,11 @@ class ZooKeeperServerEmbeddedImpl implements ZooKeeperServerEmbedded {
 
     @Override
     public void start() throws Exception {
+        start(Integer.MAX_VALUE);
+    }
+
+    @Override
+    public void start(long startupTimeout) throws Exception {
         switch (exitHandler) {
             case EXIT:
                 ServiceUtils.setSystemExitProcedure(ServiceUtils.SYSTEM_EXIT);
@@ -83,12 +92,23 @@ class ZooKeeperServerEmbeddedImpl implements ZooKeeperServerEmbedded {
                 ServiceUtils.setSystemExitProcedure(ServiceUtils.SYSTEM_EXIT);
                 break;
         }
-
+        final CompletableFuture<String> started = new CompletableFuture<>();
 
         if (config.getServers().size() > 1 || config.isDistributed()) {
             LOG.info("Running ZK Server in single Quorum MODE");
 
-            maincluster = new QuorumPeerMain();
+            maincluster = new QuorumPeerMain() {
+                protected QuorumPeer getQuorumPeer() throws SaslException {
+                    return new QuorumPeer() {
+                        @Override
+                        public void start() {
+                            super.start();
+                            LOG.info("ZK Server {} started", this);
+                            started.complete(null);
+                        }
+                    };
+                }
+            };
 
             // Start and schedule the the purge task
             purgeMgr = new DatadirCleanupManager(config
@@ -118,7 +138,13 @@ class ZooKeeperServerEmbeddedImpl implements ZooKeeperServerEmbedded {
             thread.start();
         } else {
             LOG.info("Running ZK Server in single STANDALONE MODE");
-            mainsingle = new ZooKeeperServerMain();
+            mainsingle = new ZooKeeperServerMain() {
+                @Override
+                public void serverStarted() {
+                    LOG.info("ZK Server started");
+                    started.complete(null);
+                }
+            };
             purgeMgr = new DatadirCleanupManager(config
                     .getDataDir(), config.getDataLogDir(), config
                     .getSnapRetainCount(), config.getPurgeInterval());
@@ -146,6 +172,34 @@ class ZooKeeperServerEmbeddedImpl implements ZooKeeperServerEmbedded {
             };
             thread.start();
         }
+
+        try {
+            started.get(startupTimeout, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException err) {
+            LOG.info("Startup timed out, trying to close");
+            close();
+            throw err;
+        }
+    }
+
+    @Override
+    public String getConnectionString() {
+        if (config.getClientPortAddress() != null) {
+            String raw = config.getClientPortAddress().getHostString() + ":" + config.getClientPortAddress().getPort();
+            return raw.replace("0.0.0.0", "localhost");
+        } else {
+            throw new IllegalStateException("No client address is configured");
+        }
+    }
+
+    @Override
+    public String getSecureConnectionString() {
+        if (config.getSecureClientPortAddress() != null) {
+            String raw = config.getSecureClientPortAddress().getHostString() + ":" + config.getSecureClientPortAddress().getPort();
+            return raw.replace("0.0.0.0", "localhost");
+        } else {
+            throw new IllegalStateException("No client address is configured");
+        }
     }
 
     @Override
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index 5dea201..18e97bb 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -1135,7 +1135,6 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
             adminServer.start();
         } catch (AdminServerException e) {
             LOG.warn("Problem starting AdminServer", e);
-            System.out.println(e);
         }
         startLeaderElection();
         startJvmPauseMonitor();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerClusterMutualAuthTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerClusterMutualAuthTest.java
index 923d888..1022c25 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerClusterMutualAuthTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerClusterMutualAuthTest.java
@@ -119,9 +119,9 @@ public class ZookeeperServerClusterMutualAuthTest {
             zkServer2.start();
             zkServer3.start();
 
-            assertTrue(ClientBase.waitForServerUp("localhost:" + clientport1, 60000));
-            assertTrue(ClientBase.waitForServerUp("localhost:" + clientport2, 60000));
-            assertTrue(ClientBase.waitForServerUp("localhost:" + clientport3, 60000));
+            assertTrue(ClientBase.waitForServerUp(zkServer1.getConnectionString(), 60000));
+            assertTrue(ClientBase.waitForServerUp(zkServer2.getConnectionString(), 60000));
+            assertTrue(ClientBase.waitForServerUp(zkServer3.getConnectionString(), 60000));
 
             for (int i = 0; i < 100; i++) {
                 ZookeeperServeInfo.ServerInfo status = ZookeeperServeInfo.getStatus("ReplicatedServer*");
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerClusterTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerClusterTest.java
index 090c9fc..dfbdfc3 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerClusterTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerClusterTest.java
@@ -101,9 +101,9 @@ public class ZookeeperServerClusterTest {
             zkServer2.start();
             zkServer3.start();
 
-            assertTrue(ClientBase.waitForServerUp("localhost:" + clientport1, 60000));
-            assertTrue(ClientBase.waitForServerUp("localhost:" + clientport2, 60000));
-            assertTrue(ClientBase.waitForServerUp("localhost:" + clientport3, 60000));
+            assertTrue(ClientBase.waitForServerUp(zkServer1.getConnectionString(), 60000));
+            assertTrue(ClientBase.waitForServerUp(zkServer2.getConnectionString(), 60000));
+            assertTrue(ClientBase.waitForServerUp(zkServer3.getConnectionString(), 60000));
             for (int i = 0; i < 100; i++) {
                 ZookeeperServeInfo.ServerInfo status = ZookeeperServeInfo.getStatus("ReplicatedServer*");
                 System.out.println("status:" + status);
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerEmbeddedTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerEmbeddedTest.java
index 277a6a4..d9868b2 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerEmbeddedTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerEmbeddedTest.java
@@ -59,7 +59,7 @@ public class ZookeeperServerEmbeddedTest {
                 .exitHandler(ExitHandler.LOG_ONLY)
                 .build()) {
             zkServer.start();
-            assertTrue(ClientBase.waitForServerUp("localhost:" + clientPort, 60000));
+            assertTrue(ClientBase.waitForServerUp(zkServer.getConnectionString(), 60000));
             for (int i = 0; i < 100; i++) {
                 ZookeeperServeInfo.ServerInfo status = ZookeeperServeInfo.getStatus("StandaloneServer*");
                 if (status.isLeader() && status.isStandaloneMode()) {
@@ -71,6 +71,28 @@ public class ZookeeperServerEmbeddedTest {
             assertTrue(status.isLeader());
             assertTrue(status.isStandaloneMode());
         }
+
+        // restart (all ports should be closed and the restart should always work)
+        try (ZooKeeperServerEmbedded zkServer = ZooKeeperServerEmbedded
+                .builder()
+                .baseDir(baseDir)
+                .configuration(configZookeeper)
+                .exitHandler(ExitHandler.LOG_ONLY)
+                .build()) {
+            zkServer.start();
+            assertTrue(ClientBase.waitForServerUp(zkServer.getConnectionString(), 60000));
+            for (int i = 0; i < 100; i++) {
+                ZookeeperServeInfo.ServerInfo status = ZookeeperServeInfo.getStatus("StandaloneServer*");
+                if (status.isLeader() && status.isStandaloneMode()) {
+                    break;
+                }
+                Thread.sleep(100);
+            }
+            ZookeeperServeInfo.ServerInfo status = ZookeeperServeInfo.getStatus("StandaloneServer*");
+            assertTrue(status.isLeader());
+            assertTrue(status.isStandaloneMode());
+        }
+
     }
 
 }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerSslEmbeddedTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerSslEmbeddedTest.java
index e7a4626..4611e4c 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerSslEmbeddedTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerSslEmbeddedTest.java
@@ -87,7 +87,7 @@ public class ZookeeperServerSslEmbeddedTest {
                 .exitHandler(ExitHandler.LOG_ONLY)
                 .build()) {
             zkServer.start();
-            assertTrue(ClientBase.waitForServerUp("localhost:" + clientPort, 60000));
+            assertTrue(ClientBase.waitForServerUp(zkServer.getConnectionString(), 60000));
             for (int i = 0; i < 100; i++) {
                 ZookeeperServeInfo.ServerInfo status = ZookeeperServeInfo.getStatus("StandaloneServer*");
                 if (status.isLeader() && status.isStandaloneMode()) {
@@ -104,7 +104,7 @@ public class ZookeeperServerSslEmbeddedTest {
             zKClientConfig.setProperty("zookeeper.client.secure", "true");
             // only netty supports TLS
             zKClientConfig.setProperty("zookeeper.clientCnxnSocket", org.apache.zookeeper.ClientCnxnSocketNetty.class.getName());
-            try (ZooKeeper zk = new ZooKeeper("localhost:" + clientSecurePort, 60000, (WatchedEvent event) -> {
+            try (ZooKeeper zk = new ZooKeeper(zkServer.getSecureConnectionString(), 60000, (WatchedEvent event) -> {
                 switch (event.getState()) {
                     case SyncConnected:
                         l.countDown();