You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/07/28 08:43:30 UTC

[flink] branch master updated: [FLINK-28523][tests] Increase Zookeeper session timeouts

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e122dec43ed [FLINK-28523][tests] Increase Zookeeper session timeouts
e122dec43ed is described below

commit e122dec43edac9a03ef86eeacd02ebb5f429eb70
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jul 28 10:43:19 2022 +0200

    [FLINK-28523][tests] Increase Zookeeper session timeouts
---
 .../itcases/HAQueryableStateFsBackendITCase.java   |  4 +--
 .../HAQueryableStateRocksDBBackendITCase.java      |  4 +--
 .../runtime/leaderelection/LeaderElectionTest.java |  5 +--
 .../ZooKeeperLeaderElectionTest.java               |  5 +--
 .../ZooKeeperLeaderRetrievalTest.java              |  5 +--
 .../runtime/testutils/ZooKeeperTestUtils.java      | 34 ++++++++++++++++++++
 .../runtime/util/ZooKeeperUtilsTreeCacheTest.java  |  4 ++-
 .../runtime/zookeeper/ZooKeeperExtension.java      |  3 +-
 .../flink/runtime/zookeeper/ZooKeeperResource.java |  5 +--
 .../zookeeper/ZooKeeperTestEnvironment.java        |  3 +-
 .../EventTimeWindowCheckpointingITCase.java        |  6 ++--
 .../ResumeCheckpointManuallyITCase.java            | 37 ++++------------------
 .../ZooKeeperLeaderElectionITCase.java             |  2 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java     |  5 +--
 14 files changed, 71 insertions(+), 51 deletions(-)

diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
index 47934761310..3739ed3256e 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 
 import org.apache.curator.test.TestingServer;
@@ -62,7 +63,7 @@ public class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestB
 
     @BeforeClass
     public static void setup() throws Exception {
-        zkServer = new TestingServer();
+        zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
 
         // we have to manage this manually because we have to create the ZooKeeper server
         // ahead of this
@@ -87,7 +88,6 @@ public class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestB
 
         client.shutdownAndWait();
 
-        zkServer.stop();
         zkServer.close();
     }
 
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index 2cbae94c787..508f151c0f0 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 
 import org.apache.curator.test.TestingServer;
@@ -62,7 +63,7 @@ public class HAQueryableStateRocksDBBackendITCase extends AbstractQueryableState
 
     @BeforeClass
     public static void setup() throws Exception {
-        zkServer = new TestingServer();
+        zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
 
         // we have to manage this manually because we have to create the ZooKeeper server
         // ahead of this
@@ -87,7 +88,6 @@ public class HAQueryableStateRocksDBBackendITCase extends AbstractQueryableState
 
         client.shutdownAndWait();
 
-        zkServer.stop();
         zkServer.close();
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
index a5614a1518b..9f9792c9f8c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService;
 import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.testutils.TestingUtils;
@@ -185,7 +186,7 @@ public class LeaderElectionTest extends TestLogger {
         @Override
         public void setup(FatalErrorHandler fatalErrorHandler) throws Exception {
             try {
-                testingServer = new TestingServer();
+                testingServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
             } catch (Exception e) {
                 throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
             }
@@ -208,7 +209,7 @@ public class LeaderElectionTest extends TestLogger {
             }
 
             if (testingServer != null) {
-                testingServer.stop();
+                testingServer.close();
                 testingServer = null;
             }
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index c101c3dc492..d2359586d9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.leaderretrieval.TestingLeaderRetrievalEventHandl
 import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver;
 import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
@@ -113,7 +114,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
     @Before
     public void before() {
         try {
-            testingServer = new TestingServer();
+            testingServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
         } catch (Exception e) {
             throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
         }
@@ -137,7 +138,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
         }
 
         if (testingServer != null) {
-            testingServer.stop();
+            testingServer.close();
             testingServer = null;
         }
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
index 31bca72ad67..b9a884e42a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingContender;
 import org.apache.flink.runtime.rpc.AddressResolution;
 import org.apache.flink.runtime.rpc.RpcSystem;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
@@ -75,7 +76,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger {
 
     @Before
     public void before() throws Exception {
-        testingServer = new TestingServer();
+        testingServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
 
         config = new Configuration();
         config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
@@ -101,7 +102,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger {
         }
 
         if (testingServer != null) {
-            testingServer.stop();
+            testingServer.close();
 
             testingServer = null;
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 793c1f047e9..641616015b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -25,10 +25,14 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -37,6 +41,36 @@ public class ZooKeeperTestUtils {
 
     private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperTestUtils.class);
 
+    /**
+     * Creates a new {@link TestingServer}, setting additional configuration properties for
+     * stability purposes.
+     */
+    public static TestingServer createAndStartZookeeperTestingServer() throws Exception {
+        return new TestingServer(getZookeeperInstanceSpecWithIncreasedSessionTimeout(), true);
+    }
+
+    private static InstanceSpec getZookeeperInstanceSpecWithIncreasedSessionTimeout() {
+        // this gives us the default settings
+        final InstanceSpec instanceSpec = InstanceSpec.newInstanceSpec();
+
+        final Map<String, Object> properties = new HashMap<>();
+        properties.put("maxSessionTimeout", 60_000);
+
+        final boolean deleteDataDirectoryOnClose = true;
+
+        return new InstanceSpec(
+                instanceSpec.getDataDirectory(),
+                instanceSpec.getPort(),
+                instanceSpec.getElectionPort(),
+                instanceSpec.getQuorumPort(),
+                deleteDataDirectoryOnClose,
+                instanceSpec.getServerId(),
+                instanceSpec.getTickTime(),
+                instanceSpec.getMaxClientCnxns(),
+                properties,
+                instanceSpec.getHostname());
+    }
+
     /**
      * Creates a configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
      *
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTreeCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTreeCacheTest.java
index c4922534121..d63002a839a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTreeCacheTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTreeCacheTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
 import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
@@ -61,7 +62,8 @@ public class ZooKeeperUtilsTreeCacheTest extends TestLogger {
     @Before
     public void setUp() throws Exception {
         closer = Closer.create();
-        final TestingServer testingServer = closer.register(new TestingServer());
+        final TestingServer testingServer =
+                closer.register(ZooKeeperTestUtils.createAndStartZookeeperTestingServer());
 
         Configuration configuration = new Configuration();
         configuration.set(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperExtension.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperExtension.java
index 0243bd00879..662a6a47139 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperExtension.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperExtension.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.testutils.CustomExtension;
 import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -52,7 +53,7 @@ public class ZooKeeperExtension implements CustomExtension {
     @Override
     public void before(ExtensionContext context) throws Exception {
         close();
-        zooKeeperServer = new TestingServer(true);
+        zooKeeperServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
index 4253562de61..301b7b8a850 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.zookeeper;
 
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.curator.test.TestingServer;
@@ -48,12 +49,12 @@ public class ZooKeeperResource extends ExternalResource {
     @Override
     protected void before() throws Throwable {
         terminateZooKeeperServer();
-        zooKeeperServer = new TestingServer(true);
+        zooKeeperServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
     }
 
     private void terminateZooKeeperServer() throws IOException {
         if (zooKeeperServer != null) {
-            zooKeeperServer.stop();
+            zooKeeperServer.close();
             zooKeeperServer = null;
         }
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
index c8d7edf0f95..f89ce707acd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.zookeeper;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.ExitJVMFatalErrorHandler;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 
@@ -61,7 +62,7 @@ public class ZooKeeperTestEnvironment {
 
         try {
             if (numberOfZooKeeperQuorumPeers == 1) {
-                zooKeeperServer = new TestingServer(true);
+                zooKeeperServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
                 zooKeeperCluster = null;
 
                 conf.setString(
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 23a8327e0ff..786e4623336 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
@@ -162,8 +163,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
         // Testing HA Scenario / ZKCompletedCheckpointStore with incremental checkpoints
         StateBackendEnum stateBackendEnum = getStateBackend();
         if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) {
-            zkServer = new TestingServer();
-            zkServer.start();
+            zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
         }
 
         Configuration config = createClusterConfig();
@@ -261,7 +261,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
         }
 
         if (zkServer != null) {
-            zkServer.stop();
+            zkServer.close();
             zkServer = null;
         }
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index dcdbdee7752..a4851cf51d0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -152,9 +153,7 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
 
     @Test
     public void testExternalizedIncrementalRocksDBCheckpointsZookeeper() throws Exception {
-        TestingServer zkServer = new TestingServer();
-        zkServer.start();
-        try {
+        try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer()) {
             final File checkpointDir = temporaryFolder.newFolder();
             testExternalizedCheckpoints(
                     checkpointDir,
@@ -162,16 +161,12 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
                     createRocksDBStateBackend(checkpointDir, true),
                     false,
                     restoreMode);
-        } finally {
-            zkServer.stop();
         }
     }
 
     @Test
     public void testExternalizedFullRocksDBCheckpointsZookeeper() throws Exception {
-        TestingServer zkServer = new TestingServer();
-        zkServer.start();
-        try {
+        try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer()) {
             final File checkpointDir = temporaryFolder.newFolder();
             testExternalizedCheckpoints(
                     checkpointDir,
@@ -179,17 +174,13 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
                     createRocksDBStateBackend(checkpointDir, false),
                     false,
                     restoreMode);
-        } finally {
-            zkServer.stop();
         }
     }
 
     @Test
     public void testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper()
             throws Exception {
-        TestingServer zkServer = new TestingServer();
-        zkServer.start();
-        try {
+        try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer()) {
             final File checkpointDir = temporaryFolder.newFolder();
             testExternalizedCheckpoints(
                     checkpointDir,
@@ -197,17 +188,13 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
                     createRocksDBStateBackend(checkpointDir, true),
                     true,
                     restoreMode);
-        } finally {
-            zkServer.stop();
         }
     }
 
     @Test
     public void testExternalizedFullRocksDBCheckpointsWithLocalRecoveryZookeeper()
             throws Exception {
-        TestingServer zkServer = new TestingServer();
-        zkServer.start();
-        try {
+        try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer()) {
             final File checkpointDir = temporaryFolder.newFolder();
             testExternalizedCheckpoints(
                     checkpointDir,
@@ -215,16 +202,12 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
                     createRocksDBStateBackend(checkpointDir, false),
                     true,
                     restoreMode);
-        } finally {
-            zkServer.stop();
         }
     }
 
     @Test
     public void testExternalizedFSCheckpointsZookeeper() throws Exception {
-        TestingServer zkServer = new TestingServer();
-        zkServer.start();
-        try {
+        try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer()) {
             final File checkpointDir = temporaryFolder.newFolder();
             testExternalizedCheckpoints(
                     checkpointDir,
@@ -232,16 +215,12 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
                     createFsStateBackend(checkpointDir),
                     false,
                     restoreMode);
-        } finally {
-            zkServer.stop();
         }
     }
 
     @Test
     public void testExternalizedFSCheckpointsWithLocalRecoveryZookeeper() throws Exception {
-        TestingServer zkServer = new TestingServer();
-        zkServer.start();
-        try {
+        try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer()) {
             final File checkpointDir = temporaryFolder.newFolder();
             testExternalizedCheckpoints(
                     checkpointDir,
@@ -249,8 +228,6 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
                     createFsStateBackend(checkpointDir),
                     true,
                     restoreMode);
-        } finally {
-            zkServer.stop();
         }
     }
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 2ebd53564e4..771eb0afdf0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -70,7 +70,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 
     @BeforeClass
     public static void setup() throws Exception {
-        zkServer = new TestingServer(true);
+        zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
     }
 
     @AfterClass
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 804a6d0eb8c..92504a59910 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsMessageParameters;
 import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
@@ -110,7 +111,7 @@ class YARNHighAvailabilityITCase extends YarnTestBase {
 
     @BeforeAll
     static void setup(@TempDir File tempDir) throws Exception {
-        zkServer = new TestingServer();
+        zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
 
         storageDir = tempDir.getAbsolutePath();
 
@@ -128,7 +129,7 @@ class YARNHighAvailabilityITCase extends YarnTestBase {
             YarnTestBase.teardown();
         } finally {
             if (zkServer != null) {
-                zkServer.stop();
+                zkServer.close();
                 zkServer = null;
             }
         }