You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/24 09:07:57 UTC
[flink] 01/02: [FLINK-26741][runtime] Changes method signature of CheckpointIDCounter.shutdown
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 34133b9d040d22f76710b8da987ad848f63063d3
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Mon Mar 21 14:15:27 2022 +0100
[FLINK-26741][runtime] Changes method signature of CheckpointIDCounter.shutdown
The new method returns a CompletableFuture that can be processed.
This also includes adding idempotency tests for the ZK and k8s
implementations.
---
.../KubernetesCheckpointIDCounter.java | 31 +++--
.../KubernetesCheckpointIDCounterTest.java | 138 +++++++++++++++++++++
.../KubernetesHighAvailabilityTestBase.java | 4 +
.../runtime/checkpoint/CheckpointIDCounter.java | 5 +-
.../checkpoint/DeactivatedCheckpointIDCounter.java | 7 +-
.../checkpoint/StandaloneCheckpointIDCounter.java | 6 +-
.../checkpoint/ZooKeeperCheckpointIDCounter.java | 58 ++++++++-
.../cleanup/CheckpointResourcesCleanupRunner.java | 2 +-
.../flink/runtime/scheduler/SchedulerBase.java | 2 +-
.../scheduler/adaptive/AdaptiveScheduler.java | 2 +-
.../CheckpointCoordinatorTriggeringTest.java | 5 +-
.../checkpoint/CheckpointIDCounterTestBase.java | 8 +-
.../checkpoint/TestingCheckpointIDCounter.java | 27 ++--
.../ZooKeeperCheckpointIDCounterITCase.java | 97 +++++++++++++--
.../runtime/scheduler/DefaultSchedulerTest.java | 21 +++-
.../scheduler/adaptive/AdaptiveSchedulerTest.java | 3 +-
16 files changed, 362 insertions(+), 54 deletions(-)
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounter.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounter.java
index a08146a..8eac11e 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounter.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounter.java
@@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.flink.kubernetes.utils.Constants.CHECKPOINT_COUNTER_KEY;
@@ -71,25 +73,32 @@ public class KubernetesCheckpointIDCounter implements CheckpointIDCounter {
}
@Override
- public void shutdown(JobStatus jobStatus) {
+ public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
if (!running) {
- return;
+ return FutureUtils.completedVoidFuture();
}
running = false;
LOG.info("Shutting down.");
if (jobStatus.isGloballyTerminalState()) {
LOG.info("Removing counter from ConfigMap {}", configMapName);
- kubeClient.checkAndUpdateConfigMap(
- configMapName,
- configMap -> {
- if (isValidOperation(configMap)) {
- configMap.getData().remove(CHECKPOINT_COUNTER_KEY);
- return Optional.of(configMap);
- }
- return Optional.empty();
- });
+ return kubeClient
+ .checkAndUpdateConfigMap(
+ configMapName,
+ configMap -> {
+ if (isValidOperation(configMap)) {
+ configMap.getData().remove(CHECKPOINT_COUNTER_KEY);
+ return Optional.of(configMap);
+ }
+ return Optional.empty();
+ })
+ // checkAndUpdateConfigMap only returns false if the callback returned an empty
+ // ConfigMap. We don't want to continue the cleanup in that case, i.e. we can
+ // ignore the return value
+ .thenApply(valueChanged -> null);
}
+
+ return FutureUtils.completedVoidFuture();
}
private boolean isValidOperation(KubernetesConfigMap configMap) {
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java
index d6d4313..30b8875 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java
@@ -18,13 +18,19 @@
package org.apache.flink.kubernetes.highavailability;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.junit.Test;
+import java.util.concurrent.CompletionException;
+
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
/** Tests for {@link KubernetesCheckpointIDCounter} operations. */
@@ -51,6 +57,138 @@ public class KubernetesCheckpointIDCounterTest extends KubernetesHighAvailabilit
}
@Test
+ public void testShutdown() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesCheckpointIDCounter checkpointIDCounter =
+ new KubernetesCheckpointIDCounter(
+ flinkKubeClient, LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+ checkpointIDCounter.start();
+
+ checkpointIDCounter.setCount(100L);
+
+ assertThat(
+ getLeaderConfigMap()
+ .getData()
+ .get(Constants.CHECKPOINT_COUNTER_KEY),
+ is("100"));
+
+ checkpointIDCounter.shutdown(JobStatus.FINISHED).join();
+
+ assertThat(
+ getLeaderConfigMap()
+ .getData()
+ .containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+ is(false));
+ });
+ }
+ };
+ }
+
+ @Test
+ public void testShutdownForLocallyTerminatedJobStatus() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesCheckpointIDCounter checkpointIDCounter =
+ new KubernetesCheckpointIDCounter(
+ flinkKubeClient, LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+ checkpointIDCounter.start();
+
+ checkpointIDCounter.setCount(100L);
+
+ assertThat(
+ getLeaderConfigMap()
+ .getData()
+ .get(Constants.CHECKPOINT_COUNTER_KEY),
+ is("100"));
+
+ checkpointIDCounter.shutdown(JobStatus.SUSPENDED).join();
+
+ assertThat(
+ getLeaderConfigMap()
+ .getData()
+ .containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+ is(true));
+ });
+ }
+ };
+ }
+
+ @Test
+ public void testIdempotentShutdown() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesCheckpointIDCounter checkpointIDCounter =
+ new KubernetesCheckpointIDCounter(
+ flinkKubeClient, LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+ checkpointIDCounter.start();
+
+ assertThat(
+ getLeaderConfigMap()
+ .getData()
+ .containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+ is(false));
+
+ checkpointIDCounter.shutdown(JobStatus.FINISHED).join();
+
+ assertThat(
+ getLeaderConfigMap()
+ .getData()
+ .containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+ is(false));
+
+ // a second shutdown should work without causing any errors
+ checkpointIDCounter.shutdown(JobStatus.FINISHED).join();
+ });
+ }
+ };
+ }
+
+ @Test
+ public void testShutdownFailureDueToMissingConfigMap() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesCheckpointIDCounter checkpointIDCounter =
+ new KubernetesCheckpointIDCounter(
+ flinkKubeClient, LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+ checkpointIDCounter.start();
+
+ // deleting the ConfigMap from outside of the CheckpointIDCounter while
+ // still using the counter (which is stored as an entry in the
+ // ConfigMap) causes an unexpected failure which we want to simulate
+ // here
+ flinkKubeClient.deleteConfigMap(LEADER_CONFIGMAP_NAME);
+
+ assertThrows(
+ CompletionException.class,
+ () -> checkpointIDCounter.shutdown(JobStatus.FINISHED).get());
+
+ // fixing the internal issue should make the shutdown succeed again
+ KubernetesUtils.createConfigMapIfItDoesNotExist(
+ flinkKubeClient, LEADER_CONFIGMAP_NAME, getClusterId());
+ checkpointIDCounter.shutdown(JobStatus.FINISHED).get();
+ });
+ }
+ };
+ }
+
+ @Test
public void testGetAndIncrementWithNoLeadership() throws Exception {
new Context() {
{
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
index 5310a8b..543a589 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
@@ -118,6 +118,10 @@ public class KubernetesHighAvailabilityTestBase extends TestLogger {
return kubernetesTestFixture.createFlinkKubeClientBuilder();
}
+ String getClusterId() {
+ return CLUSTER_ID;
+ }
+
KubernetesConfigMap getLeaderConfigMap() {
return kubernetesTestFixture.getLeaderConfigMap();
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
index 81e0471..bb4a395 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobStatus;
+import java.util.concurrent.CompletableFuture;
+
/** A checkpoint ID counter. */
public interface CheckpointIDCounter {
int INITIAL_CHECKPOINT_ID = 1;
@@ -34,8 +36,9 @@ public interface CheckpointIDCounter {
* or kept.
*
* @param jobStatus Job state on shut down
+ * @return The {@code CompletableFuture} holding the result of the shutdown operation.
*/
- void shutdown(JobStatus jobStatus) throws Exception;
+ CompletableFuture<Void> shutdown(JobStatus jobStatus);
/**
* Atomically increments the current checkpoint ID.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointIDCounter.java
index ca86159..7d67010 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointIDCounter.java
@@ -19,6 +19,9 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.util.concurrent.CompletableFuture;
/**
* This class represents a {@link CheckpointIDCounter} if checkpointing is deactivated.
@@ -32,7 +35,9 @@ public enum DeactivatedCheckpointIDCounter implements CheckpointIDCounter {
public void start() throws Exception {}
@Override
- public void shutdown(JobStatus jobStatus) throws Exception {}
+ public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
+ return FutureUtils.completedVoidFuture();
+ }
@Override
public long getAndIncrement() throws Exception {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
index 30b6539..8240035 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.concurrent.FutureUtils;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -37,7 +39,9 @@ public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
public void start() throws Exception {}
@Override
- public void shutdown(JobStatus jobStatus) throws Exception {}
+ public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
+ return FutureUtils.completedVoidFuture();
+ }
@Override
public long getAndIncrement() throws Exception {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
index 9c649e6..6576dde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
@@ -22,18 +22,27 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.shaded.curator5.com.google.common.collect.Sets;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.shaded.curator5.org.apache.curator.framework.api.CuratorEvent;
+import org.apache.flink.shaded.curator5.org.apache.curator.framework.api.CuratorEventType;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.shared.VersionedValue;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.state.ConnectionState;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
+import java.io.IOException;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -101,22 +110,65 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
}
@Override
- public void shutdown(JobStatus jobStatus) throws Exception {
+ public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
synchronized (startStopLock) {
if (isStarted) {
LOG.info("Shutting down.");
- sharedCount.close();
+ try {
+ sharedCount.close();
+ } catch (IOException e) {
+ return FutureUtils.completedExceptionally(e);
+ }
client.getConnectionStateListenable().removeListener(connectionStateListener);
if (jobStatus.isGloballyTerminalState()) {
LOG.info("Removing {} from ZooKeeper", counterPath);
- client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath);
+ try {
+ final CompletableFuture<Void> deletionFuture = new CompletableFuture<>();
+ client.delete()
+ .inBackground(
+ (curatorFramework, curatorEvent) ->
+ handleDeletionOfCounterPath(
+ curatorEvent, deletionFuture))
+ .forPath(counterPath);
+ return deletionFuture;
+ } catch (Exception e) {
+ return FutureUtils.completedExceptionally(e);
+ }
}
isStarted = false;
}
}
+
+ return FutureUtils.completedVoidFuture();
+ }
+
+ private void handleDeletionOfCounterPath(
+ CuratorEvent curatorEvent, CompletableFuture<Void> deletionFuture) {
+ Preconditions.checkArgument(
+ curatorEvent.getType() == CuratorEventType.DELETE,
+ "An unexpected CuratorEvent was monitored: " + curatorEvent.getType());
+ Preconditions.checkArgument(
+ counterPath.endsWith(curatorEvent.getPath()),
+ "An unexpected path was selected for deletion: " + curatorEvent.getPath());
+
+ final KeeperException.Code eventCode =
+ KeeperException.Code.get(curatorEvent.getResultCode());
+ if (Sets.immutableEnumSet(KeeperException.Code.OK, KeeperException.Code.NONODE)
+ .contains(eventCode)) {
+ deletionFuture.complete(null);
+ } else {
+ final String namespacedCounterPath =
+ ZooKeeperUtils.generateZookeeperPath(client.getNamespace(), counterPath);
+ deletionFuture.completeExceptionally(
+ new FlinkException(
+ String.format(
+ "An error occurred while shutting down the CheckpointIDCounter in path '%s'.",
+ namespacedCounterPath),
+ KeeperException.create(eventCode, namespacedCounterPath)));
+ }
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
index 4ae7678..c97f864 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
@@ -129,7 +129,7 @@ public class CheckpointResourcesCleanupRunner implements JobManagerRunner {
}
try {
- checkpointIDCounter.shutdown(getJobStatus());
+ checkpointIDCounter.shutdown(getJobStatus()).get();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index c997cc2..0ca2233 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -245,7 +245,7 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling
}
try {
- checkpointIdCounter.shutdown(jobStatus);
+ checkpointIdCounter.shutdown(jobStatus).get();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 999b2af..562c17f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -463,7 +463,7 @@ public class AdaptiveScheduler
}
try {
- checkpointIdCounter.shutdown(terminalState);
+ checkpointIdCounter.shutdown(terminalState).get();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
index ddf446b..643c09d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
@@ -835,7 +836,9 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
public void start() {}
@Override
- public void shutdown(JobStatus jobStatus) throws Exception {}
+ public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
+ return FutureUtils.completedVoidFuture();
+ }
@Override
public long getAndIncrement() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java
index 1dcfd33..59933e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java
@@ -58,7 +58,7 @@ abstract class CheckpointIDCounterTestBase {
counter.start();
assertThat(counter.get()).isGreaterThanOrEqualTo(0L);
} finally {
- counter.shutdown(JobStatus.FINISHED);
+ counter.shutdown(JobStatus.FINISHED).join();
}
}
@@ -78,7 +78,7 @@ abstract class CheckpointIDCounterTestBase {
assertThat(counter.get()).isEqualTo(4);
assertThat(counter.getAndIncrement()).isEqualTo(4);
} finally {
- counter.shutdown(JobStatus.FINISHED);
+ counter.shutdown(JobStatus.FINISHED).join();
}
}
@@ -136,7 +136,7 @@ abstract class CheckpointIDCounterTestBase {
executor.shutdown();
}
- counter.shutdown(JobStatus.FINISHED);
+ counter.shutdown(JobStatus.FINISHED).join();
}
}
@@ -161,7 +161,7 @@ abstract class CheckpointIDCounterTestBase {
assertThat(counter.get()).isEqualTo(1338);
assertThat(counter.getAndIncrement()).isEqualTo(1338);
- counter.shutdown(JobStatus.FINISHED);
+ counter.shutdown(JobStatus.FINISHED).join();
}
/** Task repeatedly incrementing the {@link CheckpointIDCounter}. */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java
index 2365ed5..466a69a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java
@@ -18,16 +18,18 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.util.concurrent.FutureUtils;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.Supplier;
/** Test {@link CheckpointIDCounter} implementation for testing the shutdown behavior. */
public final class TestingCheckpointIDCounter implements CheckpointIDCounter {
private final Runnable startRunnable;
- private final Consumer<JobStatus> shutdownConsumer;
+ private final Function<JobStatus, CompletableFuture<Void>> shutdownFunction;
private final Supplier<Integer> getAndIncrementSupplier;
private final Supplier<Integer> getSupplier;
private final Consumer<Long> setCountConsumer;
@@ -36,18 +38,22 @@ public final class TestingCheckpointIDCounter implements CheckpointIDCounter {
CompletableFuture<JobStatus> shutdownFuture) {
return TestingCheckpointIDCounter.builder()
.withStartRunnable(() -> {})
- .withShutdownConsumer(shutdownFuture::complete)
+ .withShutdownConsumer(
+ jobStatus -> {
+ shutdownFuture.complete(jobStatus);
+ return FutureUtils.completedVoidFuture();
+ })
.build();
}
private TestingCheckpointIDCounter(
Runnable startRunnable,
- Consumer<JobStatus> shutdownConsumer,
+ Function<JobStatus, CompletableFuture<Void>> shutdownFunction,
Supplier<Integer> getAndIncrementSupplier,
Supplier<Integer> getSupplier,
Consumer<Long> setCountConsumer) {
this.startRunnable = startRunnable;
- this.shutdownConsumer = shutdownConsumer;
+ this.shutdownFunction = shutdownFunction;
this.getAndIncrementSupplier = getAndIncrementSupplier;
this.getSupplier = getSupplier;
this.setCountConsumer = setCountConsumer;
@@ -59,8 +65,8 @@ public final class TestingCheckpointIDCounter implements CheckpointIDCounter {
}
@Override
- public void shutdown(JobStatus jobStatus) {
- shutdownConsumer.accept(jobStatus);
+ public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
+ return shutdownFunction.apply(jobStatus);
}
@Override
@@ -86,7 +92,7 @@ public final class TestingCheckpointIDCounter implements CheckpointIDCounter {
public static class Builder {
private Runnable startRunnable;
- private Consumer<JobStatus> shutdownConsumer;
+ private Function<JobStatus, CompletableFuture<Void>> shutdownFunction;
private Supplier<Integer> getAndIncrementSupplier;
private Supplier<Integer> getSupplier;
private Consumer<Long> setCountConsumer;
@@ -96,8 +102,9 @@ public final class TestingCheckpointIDCounter implements CheckpointIDCounter {
return this;
}
- public Builder withShutdownConsumer(Consumer<JobStatus> shutdownConsumer) {
- this.shutdownConsumer = shutdownConsumer;
+ public Builder withShutdownConsumer(
+ Function<JobStatus, CompletableFuture<Void>> shutdownFunction) {
+ this.shutdownFunction = shutdownFunction;
return this;
}
@@ -119,7 +126,7 @@ public final class TestingCheckpointIDCounter implements CheckpointIDCounter {
public TestingCheckpointIDCounter build() {
return new TestingCheckpointIDCounter(
startRunnable,
- shutdownConsumer,
+ shutdownFunction,
getAndIncrementSupplier,
getSupplier,
setCountConsumer);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java
index 798ea79..4e9c0d4e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java
@@ -19,16 +19,21 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.concurrent.ExecutionException;
+
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/**
* Unit tests for the {@link ZooKeeperCheckpointIDCounter}. The tests are inherited from the test
@@ -38,19 +43,21 @@ class ZooKeeperCheckpointIDCounterITCase extends CheckpointIDCounterTestBase {
private static ZooKeeperTestEnvironment zookeeper;
- @BeforeAll
- public static void setUp() throws Exception {
+ @BeforeEach
+ void setup() {
zookeeper = new ZooKeeperTestEnvironment(1);
}
- @AfterAll
- private static void tearDown() throws Exception {
- zookeeper.shutdown();
+ @AfterEach
+ void tearDown() throws Exception {
+ cleanAndStopZooKeeperIfRunning();
}
- @BeforeEach
- private void cleanUp() throws Exception {
- zookeeper.deleteAll();
+ private void cleanAndStopZooKeeperIfRunning() throws Exception {
+ if (zookeeper.getClient().isStarted()) {
+ zookeeper.deleteAll();
+ zookeeper.shutdown();
+ }
}
/** Tests that counter node is removed from ZooKeeper after shutdown. */
@@ -62,10 +69,73 @@ class ZooKeeperCheckpointIDCounterITCase extends CheckpointIDCounterTestBase {
CuratorFramework client = zookeeper.getClient();
assertThat(client.checkExists().forPath(counter.getPath())).isNotNull();
- counter.shutdown(JobStatus.FINISHED);
+ counter.shutdown(JobStatus.FINISHED).join();
+ assertThat(client.checkExists().forPath(counter.getPath())).isNull();
+ }
+
+ @Test
+ public void testIdempotentShutdown() throws Exception {
+ ZooKeeperCheckpointIDCounter counter = createCheckpointIdCounter();
+ counter.start();
+
+ CuratorFramework client = zookeeper.getClient();
+ counter.shutdown(JobStatus.FINISHED).join();
+
+ // shutdown shouldn't fail due to missing path
+ counter.shutdown(JobStatus.FINISHED).join();
assertThat(client.checkExists().forPath(counter.getPath())).isNull();
}
+ @Test
+ public void testShutdownWithFailureDueToMissingConnection() throws Exception {
+ ZooKeeperCheckpointIDCounter counter = createCheckpointIdCounter();
+ counter.start();
+
+ cleanAndStopZooKeeperIfRunning();
+
+ assertThatThrownBy(() -> counter.shutdown(JobStatus.FINISHED).get())
+ .as("The shutdown should fail because of the client connection being dropped.")
+ .isInstanceOf(ExecutionException.class)
+ .hasCauseExactlyInstanceOf(IllegalStateException.class);
+ }
+
+ @Test
+ public void testShutdownWithFailureDueToExistingChildNodes() throws Exception {
+ final ZooKeeperCheckpointIDCounter counter = createCheckpointIdCounter();
+ counter.start();
+
+ final CuratorFramework client =
+ ZooKeeperUtils.useNamespaceAndEnsurePath(zookeeper.getClient(), "/");
+ final String counterNodePath = ZooKeeperUtils.generateZookeeperPath(counter.getPath());
+ final String childNodePath =
+ ZooKeeperUtils.generateZookeeperPath(
+ counterNodePath, "unexpected-child-node-causing-a-failure");
+ client.create().forPath(childNodePath);
+
+ final String namespacedCounterNodePath =
+ ZooKeeperUtils.generateZookeeperPath(client.getNamespace(), counterNodePath);
+ final Throwable expectedRootCause =
+ KeeperException.create(KeeperException.Code.NOTEMPTY, namespacedCounterNodePath);
+ assertThatThrownBy(() -> counter.shutdown(JobStatus.FINISHED).get())
+ .as(
+ "The shutdown should fail because of a child node being present and the shutdown not performing an explicit recursive deletion.")
+ .isInstanceOf(ExecutionException.class)
+ .extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)
+ .anySatisfy(
+ cause ->
+ assertThat(cause)
+ .isInstanceOf(expectedRootCause.getClass())
+ .hasMessage(expectedRootCause.getMessage()));
+
+ client.delete().forPath(childNodePath);
+ counter.shutdown(JobStatus.FINISHED).join();
+
+ assertThat(client.checkExists().forPath(counterNodePath))
+ .as(
+ "A retry of the shutdown should have worked now after the root cause was resolved.")
+ .isNull();
+ }
+
/** Tests that counter node is NOT removed from ZooKeeper after suspend. */
@Test
public void testSuspendKeepsState() throws Exception {
@@ -75,13 +145,14 @@ class ZooKeeperCheckpointIDCounterITCase extends CheckpointIDCounterTestBase {
CuratorFramework client = zookeeper.getClient();
assertThat(client.checkExists().forPath(counter.getPath())).isNotNull();
- counter.shutdown(JobStatus.SUSPENDED);
+ counter.shutdown(JobStatus.SUSPENDED).join();
assertThat(client.checkExists().forPath(counter.getPath())).isNotNull();
}
@Override
protected ZooKeeperCheckpointIDCounter createCheckpointIdCounter() throws Exception {
return new ZooKeeperCheckpointIDCounter(
- zookeeper.getClient(), new DefaultLastStateConnectionStateListener());
+ ZooKeeperUtils.useNamespaceAndEnsurePath(zookeeper.getClient(), "/"),
+ new DefaultLastStateConnectionStateListener());
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index a63a752..387d667 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -107,6 +107,7 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
import java.time.Duration;
import java.util.ArrayList;
@@ -1608,7 +1609,8 @@ public class DefaultSchedulerTest extends TestLogger {
throw new RuntimeException(e);
}
},
- executorService);
+ executorService,
+ log);
} finally {
executorService.shutdownNow();
}
@@ -1620,7 +1622,8 @@ public class DefaultSchedulerTest extends TestLogger {
*/
public static void doTestCheckpointCleanerIsClosedAfterCheckpointServices(
BiFunction<CheckpointRecoveryFactory, CheckpointsCleaner, SchedulerNG> schedulerFactory,
- ScheduledExecutorService executorService)
+ ScheduledExecutorService executorService,
+ Logger logger)
throws Exception {
final CountDownLatch checkpointServicesShutdownBlocked = new CountDownLatch(1);
final CountDownLatch cleanerClosed = new CountDownLatch(1);
@@ -1638,9 +1641,17 @@ public class DefaultSchedulerTest extends TestLogger {
new StandaloneCheckpointIDCounter() {
@Override
- public void shutdown(JobStatus jobStatus) throws Exception {
- checkpointServicesShutdownBlocked.await();
- super.shutdown(jobStatus);
+ public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
+ try {
+ checkpointServicesShutdownBlocked.await();
+ } catch (InterruptedException e) {
+ logger.error(
+ "An error occurred while executing waiting for the CheckpointServices shutdown.",
+ e);
+ Thread.currentThread().interrupt();
+ }
+
+ return super.shutdown(jobStatus);
}
};
final CheckpointsCleaner checkpointsCleaner =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index e673a28..6e6f6d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -1462,7 +1462,8 @@ public class AdaptiveSchedulerTest extends TestLogger {
throw new RuntimeException(e);
}
},
- executorService);
+ executorService,
+ log);
} finally {
executorService.shutdownNow();
}