You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/19 14:05:32 UTC

[flink] 01/02: [FLINK-26690][runtime] Makes globalCleanupAsync call the removal even if the JobGraph is not put into the JobGraphStore, yet

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3966744a18d9f0e408474969c1f61ea43c8b41d5
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Mar 16 18:09:01 2022 +0100

    [FLINK-26690][runtime] Makes globalCleanupAsync call the removal even if the JobGraph is not put into the JobGraphStore, yet
    
    This can happen if cleanup is triggered after a
    failover of a dirty JobResultStore entry (i.e. of
    a globally-terminated job). In that case, no
    recovery of the JobGraph happens and, therefore, no
    JobGraph is added to the internal addedJobGraphs
    collection.
    
    This required KubernetesStateHandleStore.releaseAndTryRemove
    to work for non-existing state as well. The ZooKeeperStateHandleStore
    implementation is already idempotent in this matter.
    
    ZooKeeperStateHandleStore.releaseAndTryRemove already works like that.
---
 .../KubernetesStateHandleStore.java                |  9 ++++++--
 .../KubernetesStateHandleStoreTest.java            | 27 ++++++++++++++++++++++
 .../runtime/jobmanager/DefaultJobGraphStore.java   |  8 +++----
 .../runtime/persistence/StateHandleStore.java      |  3 ++-
 .../jobmanager/DefaultJobGraphStoreTest.java       | 27 ++++++++++++++--------
 .../zookeeper/ZooKeeperStateHandleStoreTest.java   | 12 ++++++++++
 6 files changed, 69 insertions(+), 17 deletions(-)

diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
index cc7153d..0716b58 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
@@ -51,6 +51,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -464,13 +465,14 @@ public class KubernetesStateHandleStore<T extends Serializable>
      * It returns the {@link RetrievableStateHandle} stored under the given state node if any.
      *
      * @param key Key to be removed from ConfigMap
-     * @return True if the state handle is removed successfully
+     * @return True if the state handle isn't listed anymore.
      * @throws Exception if removing the key or discarding the state failed
      */
     @Override
     public boolean releaseAndTryRemove(String key) throws Exception {
         checkNotNull(key, "Key in ConfigMap.");
         final AtomicReference<RetrievableStateHandle<T>> stateHandleRefer = new AtomicReference<>();
+        final AtomicBoolean stateHandleDoesNotExist = new AtomicBoolean(false);
         return updateConfigMap(
                         configMap -> {
                             final String content = configMap.getData().get(key);
@@ -496,6 +498,8 @@ public class KubernetesStateHandleStore<T extends Serializable>
                                     Objects.requireNonNull(configMap.getData().remove(key));
                                 }
                                 return Optional.of(configMap);
+                            } else {
+                                stateHandleDoesNotExist.set(true);
                             }
                             return Optional.empty();
                         })
@@ -516,7 +520,8 @@ public class KubernetesStateHandleStore<T extends Serializable>
                                     throw new CompletionException(e);
                                 }
                             }
-                            return CompletableFuture.completedFuture(updated);
+                            return CompletableFuture.completedFuture(
+                                    stateHandleDoesNotExist.get() || updated);
                         })
                 .get();
     }
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
index 9896d49..51294f4 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
@@ -804,6 +804,33 @@ public class KubernetesStateHandleStoreTest extends KubernetesHighAvailabilityTe
     }
 
     @Test
+    public void testRemoveOfNonExistingState() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesStateHandleStore<
+                                            TestingLongStateHandleHelper.LongStateHandle>
+                                    store =
+                                            new KubernetesStateHandleStore<>(
+                                                    flinkKubeClient,
+                                                    LEADER_CONFIGMAP_NAME,
+                                                    longStateStorage,
+                                                    filter,
+                                                    LOCK_IDENTITY);
+                            assertThat(store.getAllAndLock().size(), is(0));
+                            assertThat(store.releaseAndTryRemove(key), is(true));
+                            assertThat(store.getAllAndLock().size(), is(0));
+
+                            assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0));
+                        });
+            }
+        };
+    }
+
+    @Test
     public void testRemoveFailedShouldNotDiscardState() throws Exception {
         new Context() {
             {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java
index fda4964..e66a172 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java
@@ -249,12 +249,10 @@ public class DefaultJobGraphStore<R extends ResourceVersion<R>>
                 () -> {
                     LOG.debug("Removing job graph {} from {}.", jobId, jobGraphStateHandleStore);
 
-                    if (addedJobGraphs.contains(jobId)) {
-                        final String name = jobGraphStoreUtil.jobIDToName(jobId);
-                        releaseAndRemoveOrThrowCompletionException(jobId, name);
+                    final String name = jobGraphStoreUtil.jobIDToName(jobId);
+                    releaseAndRemoveOrThrowCompletionException(jobId, name);
 
-                        addedJobGraphs.remove(jobId);
-                    }
+                    addedJobGraphs.remove(jobId);
 
                     LOG.info("Removed job graph {} from {}.", jobId, jobGraphStateHandleStore);
                 },
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java
index 5cbfeac..f8e6d69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java
@@ -121,7 +121,8 @@ public interface StateHandleStore<T extends Serializable, R extends ResourceVers
      * node if any. Also the state on the external storage will be discarded.
      *
      * @param name Key name in ConfigMap or child path name in ZooKeeper
-     * @return True if the state handle could be removed.
+     * @return {@code true} if the state handle is removed (also if it didn't exist in the first
+     *     place); otherwise {@code false}.
      * @throws Exception if releasing, removing the handles or discarding the state failed
      */
     boolean releaseAndTryRemove(String name) throws Exception;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
index d45a39a..1343e7f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
@@ -41,8 +41,8 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
@@ -51,6 +51,7 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 
 /**
@@ -220,14 +221,22 @@ public class DefaultJobGraphStoreTest extends TestLogger {
                 .globalCleanupAsync(testingJobGraph.getJobID(), Executors.directExecutor())
                 .join();
 
-        try {
-            removeFuture.get(timeout, TimeUnit.MILLISECONDS);
-            fail(
-                    "We should get an expected timeout because we are removing a non-existed job graph.");
-        } catch (TimeoutException ex) {
-            // expected
-        }
-        assertThat(removeFuture.isDone(), is(false));
+        assertThat(removeFuture.isDone(), is(true));
+    }
+
+    @Test
+    public void testGlobalCleanupFailsIfRemovalReturnsFalse() throws Exception {
+        final TestingStateHandleStore<JobGraph> stateHandleStore =
+                builder.setRemoveFunction(name -> false).build();
+
+        final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore);
+        assertThrows(
+                ExecutionException.class,
+                () ->
+                        jobGraphStore
+                                .globalCleanupAsync(
+                                        testingJobGraph.getJobID(), Executors.directExecutor())
+                                .get());
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index dc81131..ddf6fc7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -224,6 +224,18 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
     }
 
     @Test
+    public void testCleanupOfNonExistingState() throws Exception {
+        final ZooKeeperStateHandleStore<TestingLongStateHandleHelper.LongStateHandle> testInstance =
+                new ZooKeeperStateHandleStore<>(
+                        ZOOKEEPER.getClient(), new TestingLongStateHandleHelper());
+
+        final String pathInZooKeeper = "/testCleanupOfNonExistingState";
+
+        assertTrue(testInstance.releaseAndTryRemove(pathInZooKeeper));
+        assertFalse(testInstance.exists(pathInZooKeeper).isExisting());
+    }
+
+    @Test
     public void testRepeatableCleanupWithLockOnNode() throws Exception {
         final CuratorFramework client =
                 ZooKeeperUtils.useNamespaceAndEnsurePath(