You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2022/08/24 07:41:58 UTC

[flink] branch master updated: [FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent

This is an automated email from the ASF dual-hosted git repository.

wangyang0918 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new aae96d0c9d1 [FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent
aae96d0c9d1 is described below

commit aae96d0c9d1768c396bdf2ee6510677fbb8f317a
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Mon Aug 15 23:03:19 2022 +0800

    [FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent
    
    This closes #20590.
---
 .../KubernetesStateHandleStore.java                | 16 +++--
 .../KubernetesStateHandleStoreTest.java            | 75 ++++++++++++++++++++++
 2 files changed, 87 insertions(+), 4 deletions(-)

diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
index 0716b58ec34..42f07003207 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
@@ -648,10 +648,12 @@ public class KubernetesStateHandleStore<T extends Serializable>
     private Optional<KubernetesConfigMap> addEntry(
             KubernetesConfigMap configMap, String key, byte[] serializedStateHandle)
             throws Exception {
-        final String content = configMap.getData().get(key);
-        if (content != null) {
+        final String oldBase64Content = configMap.getData().get(key);
+        final String newBase64Content = toBase64(serializedStateHandle);
+        if (oldBase64Content != null) {
             try {
-                final StateHandleWithDeleteMarker<T> stateHandle = deserializeStateHandle(content);
+                final StateHandleWithDeleteMarker<T> stateHandle =
+                        deserializeStateHandle(oldBase64Content);
                 if (stateHandle.isMarkedForDeletion()) {
                     // This might be a left-over after the fail-over. As the remove operation is
                     // idempotent let's try to finish it.
@@ -660,6 +662,12 @@ public class KubernetesStateHandleStore<T extends Serializable>
                                 "Unable to remove the marked as deleting entry.");
                     }
                 } else {
+                    // It could happen that the kubernetes client retries a transaction that has
+                    // already succeeded due to network issues. So we simply ignore when the
+                    // new content is same as the existing one.
+                    if (oldBase64Content.equals(newBase64Content)) {
+                        return Optional.of(configMap);
+                    }
                     throw getKeyAlreadyExistException(key);
                 }
             } catch (IOException e) {
@@ -668,7 +676,7 @@ public class KubernetesStateHandleStore<T extends Serializable>
                 logInvalidEntry(key, configMapName, e);
             }
         }
-        configMap.getData().put(key, toBase64(serializedStateHandle));
+        configMap.getData().put(key, newBase64Content);
         return Optional.of(configMap);
     }
 
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
index e246dda45a9..2d58f93a8fe 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.kubernetes.highavailability;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.StateHandleWithDeleteMarker;
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
@@ -27,10 +28,13 @@ import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
 import org.apache.flink.runtime.persistence.StateHandleStore;
 import org.apache.flink.runtime.persistence.StringResourceVersion;
 import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.FunctionUtils;
 
+import io.fabric8.kubernetes.client.KubernetesClientException;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -39,6 +43,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import java.util.function.Predicate;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
@@ -222,6 +230,45 @@ class KubernetesStateHandleStoreTest extends KubernetesHighAvailabilityTestBase
         };
     }
 
+    @Test
+    void testAddAndLockShouldNotThrowAlreadyExistExceptionWithSameContents() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final FlinkKubeClient anotherFlinkKubeClient =
+                                    createFlinkKubeClientBuilder()
+                                            .setCheckAndUpdateConfigMapFunction(
+                                                    (configMapName, function) ->
+                                                            retryWithFirstFailedK8sOperation(
+                                                                    function, getLeaderConfigMap()))
+                                            .build();
+                            final KubernetesStateHandleStore<
+                                            TestingLongStateHandleHelper.LongStateHandle>
+                                    store =
+                                            new KubernetesStateHandleStore<>(
+                                                    anotherFlinkKubeClient,
+                                                    LEADER_CONFIGMAP_NAME,
+                                                    longStateStorage,
+                                                    filter,
+                                                    LOCK_IDENTITY);
+
+                            store.addAndLock(key, state);
+                            assertThat(TestingLongStateHandleHelper.getGlobalStorageSize())
+                                    .isEqualTo(1);
+                            assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount())
+                                    .isEqualTo(0);
+                            assertThat(store.getAllHandles()).hasSize(1);
+                            assertThat(store.getAllHandles()).contains(key);
+                            assertThat(store.getAndLock(key).retrieveState().getValue())
+                                    .isEqualTo(state.getValue());
+                        });
+            }
+        };
+    }
+
     @Test
     void testAddFailedWhenConfigMapNotExistAndDiscardState() throws Exception {
         new Context() {
@@ -1143,4 +1190,32 @@ class KubernetesStateHandleStoreTest extends KubernetesHighAvailabilityTestBase
         configMap.getData().put(key, deleting);
         return state;
     }
+
+    private static CompletableFuture<Boolean> retryWithFirstFailedK8sOperation(
+            Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> function,
+            KubernetesConfigMap leaderConfigMap) {
+        final AtomicInteger callbackInvocationCount = new AtomicInteger(0);
+        final CompletableFuture<Boolean> result =
+                FutureUtils.retry(
+                        () ->
+                                CompletableFuture.supplyAsync(
+                                        () -> {
+                                            callbackInvocationCount.incrementAndGet();
+                                            function.apply(leaderConfigMap);
+                                            if (callbackInvocationCount.get() == 1) {
+                                                throw new KubernetesClientException(
+                                                        "Expected exception to simulate unstable "
+                                                                + "kubernetes client operation");
+                                            }
+                                            return true;
+                                        }),
+                        KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES
+                                .defaultValue(),
+                        t ->
+                                ExceptionUtils.findThrowable(t, KubernetesClientException.class)
+                                        .isPresent(),
+                        Executors.newDirectExecutorService());
+        assertThat(callbackInvocationCount.get()).isEqualTo(2);
+        return result;
+    }
 }