You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/07/28 08:43:30 UTC
[flink] branch master updated: [FLINK-28523][tests] Increase Zookeeper session timeouts
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e122dec43ed [FLINK-28523][tests] Increase Zookeeper session timeouts
e122dec43ed is described below
commit e122dec43edac9a03ef86eeacd02ebb5f429eb70
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jul 28 10:43:19 2022 +0200
[FLINK-28523][tests] Increase Zookeeper session timeouts
---
.../itcases/HAQueryableStateFsBackendITCase.java | 4 +--
.../HAQueryableStateRocksDBBackendITCase.java | 4 +--
.../runtime/leaderelection/LeaderElectionTest.java | 5 +--
.../ZooKeeperLeaderElectionTest.java | 5 +--
.../ZooKeeperLeaderRetrievalTest.java | 5 +--
.../runtime/testutils/ZooKeeperTestUtils.java | 34 ++++++++++++++++++++
.../runtime/util/ZooKeeperUtilsTreeCacheTest.java | 4 ++-
.../runtime/zookeeper/ZooKeeperExtension.java | 3 +-
.../flink/runtime/zookeeper/ZooKeeperResource.java | 5 +--
.../zookeeper/ZooKeeperTestEnvironment.java | 3 +-
.../EventTimeWindowCheckpointingITCase.java | 6 ++--
.../ResumeCheckpointManuallyITCase.java | 37 ++++------------------
.../ZooKeeperLeaderElectionITCase.java | 2 +-
.../flink/yarn/YARNHighAvailabilityITCase.java | 5 +--
14 files changed, 71 insertions(+), 51 deletions(-)
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
index 47934761310..3739ed3256e 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.curator.test.TestingServer;
@@ -62,7 +63,7 @@ public class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestB
@BeforeClass
public static void setup() throws Exception {
- zkServer = new TestingServer();
+ zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
// we have to manage this manually because we have to create the ZooKeeper server
// ahead of this
@@ -87,7 +88,6 @@ public class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestB
client.shutdownAndWait();
- zkServer.stop();
zkServer.close();
}
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index 2cbae94c787..508f151c0f0 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.curator.test.TestingServer;
@@ -62,7 +63,7 @@ public class HAQueryableStateRocksDBBackendITCase extends AbstractQueryableState
@BeforeClass
public static void setup() throws Exception {
- zkServer = new TestingServer();
+ zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
// we have to manage this manually because we have to create the ZooKeeper server
// ahead of this
@@ -87,7 +88,6 @@ public class HAQueryableStateRocksDBBackendITCase extends AbstractQueryableState
client.shutdownAndWait();
- zkServer.stop();
zkServer.close();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
index a5614a1518b..9f9792c9f8c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.testutils.TestingUtils;
@@ -185,7 +186,7 @@ public class LeaderElectionTest extends TestLogger {
@Override
public void setup(FatalErrorHandler fatalErrorHandler) throws Exception {
try {
- testingServer = new TestingServer();
+ testingServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
} catch (Exception e) {
throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
}
@@ -208,7 +209,7 @@ public class LeaderElectionTest extends TestLogger {
}
if (testingServer != null) {
- testingServer.stop();
+ testingServer.close();
testingServer = null;
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index c101c3dc492..d2359586d9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.leaderretrieval.TestingLeaderRetrievalEventHandl
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver;
import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.runtime.util.ZooKeeperUtils;
@@ -113,7 +114,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
@Before
public void before() {
try {
- testingServer = new TestingServer();
+ testingServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
} catch (Exception e) {
throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
}
@@ -137,7 +138,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
}
if (testingServer != null) {
- testingServer.stop();
+ testingServer.close();
testingServer = null;
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
index 31bca72ad67..b9a884e42a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingContender;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.RpcSystem;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.runtime.util.ZooKeeperUtils;
@@ -75,7 +76,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger {
@Before
public void before() throws Exception {
- testingServer = new TestingServer();
+ testingServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
@@ -101,7 +102,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger {
}
if (testingServer != null) {
- testingServer.stop();
+ testingServer.close();
testingServer = null;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 793c1f047e9..641616015b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -25,10 +25,14 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -37,6 +41,36 @@ public class ZooKeeperTestUtils {
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperTestUtils.class);
+ /**
+ * Creates a new {@link TestingServer}, setting additional configuration properties for
+ * stability purposes.
+ */
+ public static TestingServer createAndStartZookeeperTestingServer() throws Exception {
+ return new TestingServer(getZookeeperInstanceSpecWithIncreasedSessionTimeout(), true);
+ }
+
+ private static InstanceSpec getZookeeperInstanceSpecWithIncreasedSessionTimeout() {
+ // this gives us the default settings
+ final InstanceSpec instanceSpec = InstanceSpec.newInstanceSpec();
+
+ final Map<String, Object> properties = new HashMap<>();
+ properties.put("maxSessionTimeout", 60_000);
+
+ final boolean deleteDataDirectoryOnClose = true;
+
+ return new InstanceSpec(
+ instanceSpec.getDataDirectory(),
+ instanceSpec.getPort(),
+ instanceSpec.getElectionPort(),
+ instanceSpec.getQuorumPort(),
+ deleteDataDirectoryOnClose,
+ instanceSpec.getServerId(),
+ instanceSpec.getTickTime(),
+ instanceSpec.getMaxClientCnxns(),
+ properties,
+ instanceSpec.getHostname());
+ }
+
/**
* Creates a configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
*
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTreeCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTreeCacheTest.java
index c4922534121..d63002a839a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTreeCacheTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTreeCacheTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
@@ -61,7 +62,8 @@ public class ZooKeeperUtilsTreeCacheTest extends TestLogger {
@Before
public void setUp() throws Exception {
closer = Closer.create();
- final TestingServer testingServer = closer.register(new TestingServer());
+ final TestingServer testingServer =
+ closer.register(ZooKeeperTestUtils.createAndStartZookeeperTestingServer());
Configuration configuration = new Configuration();
configuration.set(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperExtension.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperExtension.java
index 0243bd00879..662a6a47139 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperExtension.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperExtension.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.Preconditions;
@@ -52,7 +53,7 @@ public class ZooKeeperExtension implements CustomExtension {
@Override
public void before(ExtensionContext context) throws Exception {
close();
- zooKeeperServer = new TestingServer(true);
+ zooKeeperServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
index 4253562de61..301b7b8a850 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.zookeeper;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.util.Preconditions;
import org.apache.curator.test.TestingServer;
@@ -48,12 +49,12 @@ public class ZooKeeperResource extends ExternalResource {
@Override
protected void before() throws Throwable {
terminateZooKeeperServer();
- zooKeeperServer = new TestingServer(true);
+ zooKeeperServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
}
private void terminateZooKeeperServer() throws IOException {
if (zooKeeperServer != null) {
- zooKeeperServer.stop();
+ zooKeeperServer.close();
zooKeeperServer = null;
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
index c8d7edf0f95..f89ce707acd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.zookeeper;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ExitJVMFatalErrorHandler;
import org.apache.flink.runtime.util.ZooKeeperUtils;
@@ -61,7 +62,7 @@ public class ZooKeeperTestEnvironment {
try {
if (numberOfZooKeeperQuorumPeers == 1) {
- zooKeeperServer = new TestingServer(true);
+ zooKeeperServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
zooKeeperCluster = null;
conf.setString(
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 23a8327e0ff..786e4623336 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
@@ -162,8 +163,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
// Testing HA Scenario / ZKCompletedCheckpointStore with incremental checkpoints
StateBackendEnum stateBackendEnum = getStateBackend();
if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) {
- zkServer = new TestingServer();
- zkServer.start();
+ zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
}
Configuration config = createClusterConfig();
@@ -261,7 +261,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
}
if (zkServer != null) {
- zkServer.stop();
+ zkServer.close();
zkServer = null;
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index dcdbdee7752..a4851cf51d0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -152,9 +153,7 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
@Test
public void testExternalizedIncrementalRocksDBCheckpointsZookeeper() throws Exception {
- TestingServer zkServer = new TestingServer();
- zkServer.start();
- try {
+ try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer()) {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
@@ -162,16 +161,12 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
createRocksDBStateBackend(checkpointDir, true),
false,
restoreMode);
- } finally {
- zkServer.stop();
}
}
@Test
public void testExternalizedFullRocksDBCheckpointsZookeeper() throws Exception {
- TestingServer zkServer = new TestingServer();
- zkServer.start();
- try {
+ try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer()) {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
@@ -179,17 +174,13 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
createRocksDBStateBackend(checkpointDir, false),
false,
restoreMode);
- } finally {
- zkServer.stop();
}
}
@Test
public void testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper()
throws Exception {
- TestingServer zkServer = new TestingServer();
- zkServer.start();
- try {
+ try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer()) {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
@@ -197,17 +188,13 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
createRocksDBStateBackend(checkpointDir, true),
true,
restoreMode);
- } finally {
- zkServer.stop();
}
}
@Test
public void testExternalizedFullRocksDBCheckpointsWithLocalRecoveryZookeeper()
throws Exception {
- TestingServer zkServer = new TestingServer();
- zkServer.start();
- try {
+ try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer()) {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
@@ -215,16 +202,12 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
createRocksDBStateBackend(checkpointDir, false),
true,
restoreMode);
- } finally {
- zkServer.stop();
}
}
@Test
public void testExternalizedFSCheckpointsZookeeper() throws Exception {
- TestingServer zkServer = new TestingServer();
- zkServer.start();
- try {
+ try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer()) {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
@@ -232,16 +215,12 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
createFsStateBackend(checkpointDir),
false,
restoreMode);
- } finally {
- zkServer.stop();
}
}
@Test
public void testExternalizedFSCheckpointsWithLocalRecoveryZookeeper() throws Exception {
- TestingServer zkServer = new TestingServer();
- zkServer.start();
- try {
+ try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer()) {
final File checkpointDir = temporaryFolder.newFolder();
testExternalizedCheckpoints(
checkpointDir,
@@ -249,8 +228,6 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
createFsStateBackend(checkpointDir),
true,
restoreMode);
- } finally {
- zkServer.stop();
}
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 2ebd53564e4..771eb0afdf0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -70,7 +70,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
@BeforeClass
public static void setup() throws Exception {
- zkServer = new TestingServer(true);
+ zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
}
@AfterClass
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 804a6d0eb8c..92504a59910 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
@@ -110,7 +111,7 @@ class YARNHighAvailabilityITCase extends YarnTestBase {
@BeforeAll
static void setup(@TempDir File tempDir) throws Exception {
- zkServer = new TestingServer();
+ zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
storageDir = tempDir.getAbsolutePath();
@@ -128,7 +129,7 @@ class YARNHighAvailabilityITCase extends YarnTestBase {
YarnTestBase.teardown();
} finally {
if (zkServer != null) {
- zkServer.stop();
+ zkServer.close();
zkServer = null;
}
}