You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/19 14:05:32 UTC
[flink] 01/02: [FLINK-26690][runtime] Makes globalCleanupAsync call the removal even if the JobGraph is not put into the JobGraphStore, yet
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3966744a18d9f0e408474969c1f61ea43c8b41d5
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Mar 16 18:09:01 2022 +0100
[FLINK-26690][runtime] Makes globalCleanupAsync call the removal even if the JobGraph is not put into the JobGraphStore, yet
This can happen if cleanup is triggered after a
failover of a dirty JobResultStore entry (i.e. of
a globally-terminated job). In that case, no
recovery of the JobGraph happens and, therefore, no
JobGraph is added to the internal addedJobGraphs
collection.
This required KubernetesStateHandleStore.releaseAndTryRemove
to work for non-existing state as well. The ZooKeeperStateHandleStore
implementation is already idempotent in this matter.
ZooKeeperStateHandleStore.releaseAndTryRemove already works like that.
---
.../KubernetesStateHandleStore.java | 9 ++++++--
.../KubernetesStateHandleStoreTest.java | 27 ++++++++++++++++++++++
.../runtime/jobmanager/DefaultJobGraphStore.java | 8 +++----
.../runtime/persistence/StateHandleStore.java | 3 ++-
.../jobmanager/DefaultJobGraphStoreTest.java | 27 ++++++++++++++--------
.../zookeeper/ZooKeeperStateHandleStoreTest.java | 12 ++++++++++
6 files changed, 69 insertions(+), 17 deletions(-)
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
index cc7153d..0716b58 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
@@ -51,6 +51,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -464,13 +465,14 @@ public class KubernetesStateHandleStore<T extends Serializable>
* It returns the {@link RetrievableStateHandle} stored under the given state node if any.
*
* @param key Key to be removed from ConfigMap
- * @return True if the state handle is removed successfully
+ * @return True if the state handle isn't listed anymore.
* @throws Exception if removing the key or discarding the state failed
*/
@Override
public boolean releaseAndTryRemove(String key) throws Exception {
checkNotNull(key, "Key in ConfigMap.");
final AtomicReference<RetrievableStateHandle<T>> stateHandleRefer = new AtomicReference<>();
+ final AtomicBoolean stateHandleDoesNotExist = new AtomicBoolean(false);
return updateConfigMap(
configMap -> {
final String content = configMap.getData().get(key);
@@ -496,6 +498,8 @@ public class KubernetesStateHandleStore<T extends Serializable>
Objects.requireNonNull(configMap.getData().remove(key));
}
return Optional.of(configMap);
+ } else {
+ stateHandleDoesNotExist.set(true);
}
return Optional.empty();
})
@@ -516,7 +520,8 @@ public class KubernetesStateHandleStore<T extends Serializable>
throw new CompletionException(e);
}
}
- return CompletableFuture.completedFuture(updated);
+ return CompletableFuture.completedFuture(
+ stateHandleDoesNotExist.get() || updated);
})
.get();
}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
index 9896d49..51294f4 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
@@ -804,6 +804,33 @@ public class KubernetesStateHandleStoreTest extends KubernetesHighAvailabilityTe
}
@Test
+ public void testRemoveOfNonExistingState() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesStateHandleStore<
+ TestingLongStateHandleHelper.LongStateHandle>
+ store =
+ new KubernetesStateHandleStore<>(
+ flinkKubeClient,
+ LEADER_CONFIGMAP_NAME,
+ longStateStorage,
+ filter,
+ LOCK_IDENTITY);
+ assertThat(store.getAllAndLock().size(), is(0));
+ assertThat(store.releaseAndTryRemove(key), is(true));
+ assertThat(store.getAllAndLock().size(), is(0));
+
+ assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0));
+ });
+ }
+ };
+ }
+
+ @Test
public void testRemoveFailedShouldNotDiscardState() throws Exception {
new Context() {
{
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java
index fda4964..e66a172 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java
@@ -249,12 +249,10 @@ public class DefaultJobGraphStore<R extends ResourceVersion<R>>
() -> {
LOG.debug("Removing job graph {} from {}.", jobId, jobGraphStateHandleStore);
- if (addedJobGraphs.contains(jobId)) {
- final String name = jobGraphStoreUtil.jobIDToName(jobId);
- releaseAndRemoveOrThrowCompletionException(jobId, name);
+ final String name = jobGraphStoreUtil.jobIDToName(jobId);
+ releaseAndRemoveOrThrowCompletionException(jobId, name);
- addedJobGraphs.remove(jobId);
- }
+ addedJobGraphs.remove(jobId);
LOG.info("Removed job graph {} from {}.", jobId, jobGraphStateHandleStore);
},
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java
index 5cbfeac..f8e6d69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java
@@ -121,7 +121,8 @@ public interface StateHandleStore<T extends Serializable, R extends ResourceVers
* node if any. Also the state on the external storage will be discarded.
*
* @param name Key name in ConfigMap or child path name in ZooKeeper
- * @return True if the state handle could be removed.
+ * @return {@code true} if the state handle is removed (also if it didn't exist in the first
+ * place); otherwise {@code false}.
* @throws Exception if releasing, removing the handles or discarding the state failed
*/
boolean releaseAndTryRemove(String name) throws Exception;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
index d45a39a..1343e7f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
@@ -41,8 +41,8 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -51,6 +51,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
/**
@@ -220,14 +221,22 @@ public class DefaultJobGraphStoreTest extends TestLogger {
.globalCleanupAsync(testingJobGraph.getJobID(), Executors.directExecutor())
.join();
- try {
- removeFuture.get(timeout, TimeUnit.MILLISECONDS);
- fail(
- "We should get an expected timeout because we are removing a non-existed job graph.");
- } catch (TimeoutException ex) {
- // expected
- }
- assertThat(removeFuture.isDone(), is(false));
+ assertThat(removeFuture.isDone(), is(true));
+ }
+
+ @Test
+ public void testGlobalCleanupFailsIfRemovalReturnsFalse() throws Exception {
+ final TestingStateHandleStore<JobGraph> stateHandleStore =
+ builder.setRemoveFunction(name -> false).build();
+
+ final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore);
+ assertThrows(
+ ExecutionException.class,
+ () ->
+ jobGraphStore
+ .globalCleanupAsync(
+ testingJobGraph.getJobID(), Executors.directExecutor())
+ .get());
}
@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index dc81131..ddf6fc7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -224,6 +224,18 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
}
@Test
+ public void testCleanupOfNonExistingState() throws Exception {
+ final ZooKeeperStateHandleStore<TestingLongStateHandleHelper.LongStateHandle> testInstance =
+ new ZooKeeperStateHandleStore<>(
+ ZOOKEEPER.getClient(), new TestingLongStateHandleHelper());
+
+ final String pathInZooKeeper = "/testCleanupOfNonExistingState";
+
+ assertTrue(testInstance.releaseAndTryRemove(pathInZooKeeper));
+ assertFalse(testInstance.exists(pathInZooKeeper).isExisting());
+ }
+
+ @Test
public void testRepeatableCleanupWithLockOnNode() throws Exception {
final CuratorFramework client =
ZooKeeperUtils.useNamespaceAndEnsurePath(