You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2022/08/27 06:55:17 UTC

[flink] branch release-1.15 updated: [FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent

This is an automated email from the ASF dual-hosted git repository.

wangyang0918 pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new fcff4903c8d [FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent
fcff4903c8d is described below

commit fcff4903c8d625edb8f4e33b03bfded52c3deba8
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Mon Aug 15 23:03:19 2022 +0800

    [FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent
    
    This closes #20673.
---
 .../KubernetesStateHandleStore.java                | 16 +++--
 .../KubernetesStateHandleStoreTest.java            | 79 ++++++++++++++++++++++
 2 files changed, 91 insertions(+), 4 deletions(-)

diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
index 0716b58ec34..42f07003207 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
@@ -648,10 +648,12 @@ public class KubernetesStateHandleStore<T extends Serializable>
     private Optional<KubernetesConfigMap> addEntry(
             KubernetesConfigMap configMap, String key, byte[] serializedStateHandle)
             throws Exception {
-        final String content = configMap.getData().get(key);
-        if (content != null) {
+        final String oldBase64Content = configMap.getData().get(key);
+        final String newBase64Content = toBase64(serializedStateHandle);
+        if (oldBase64Content != null) {
             try {
-                final StateHandleWithDeleteMarker<T> stateHandle = deserializeStateHandle(content);
+                final StateHandleWithDeleteMarker<T> stateHandle =
+                        deserializeStateHandle(oldBase64Content);
                 if (stateHandle.isMarkedForDeletion()) {
                     // This might be a left-over after the fail-over. As the remove operation is
                     // idempotent let's try to finish it.
@@ -660,6 +662,12 @@ public class KubernetesStateHandleStore<T extends Serializable>
                                 "Unable to remove the marked as deleting entry.");
                     }
                 } else {
+                    // It could happen that the kubernetes client retries a transaction that has
+                    // already succeeded due to network issues. So we simply ignore when the
+                    // new content is same as the existing one.
+                    if (oldBase64Content.equals(newBase64Content)) {
+                        return Optional.of(configMap);
+                    }
                     throw getKeyAlreadyExistException(key);
                 }
             } catch (IOException e) {
@@ -668,7 +676,7 @@ public class KubernetesStateHandleStore<T extends Serializable>
                 logInvalidEntry(key, configMapName, e);
             }
         }
-        configMap.getData().put(key, toBase64(serializedStateHandle));
+        configMap.getData().put(key, newBase64Content);
         return Optional.of(configMap);
     }
 
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
index c2b46c8ed80..f0106cdaf54 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.highavailability;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.StateHandleWithDeleteMarker;
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
@@ -29,10 +30,13 @@ import org.apache.flink.runtime.persistence.StateHandleStore;
 import org.apache.flink.runtime.persistence.StringResourceVersion;
 import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.FunctionUtils;
 
+import io.fabric8.kubernetes.client.KubernetesClientException;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -41,6 +45,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -221,6 +229,45 @@ public class KubernetesStateHandleStoreTest extends KubernetesHighAvailabilityTe
         };
     }
 
+    @Test
+    public void testAddAndLockShouldNotThrowAlreadyExistExceptionWithSameContents()
+            throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final FlinkKubeClient anotherFlinkKubeClient =
+                                    createFlinkKubeClientBuilder()
+                                            .setCheckAndUpdateConfigMapFunction(
+                                                    (configMapName, function) ->
+                                                            retryWithFirstFailedK8sOperation(
+                                                                    function, getLeaderConfigMap()))
+                                            .build();
+                            final KubernetesStateHandleStore<
+                                            TestingLongStateHandleHelper.LongStateHandle>
+                                    store =
+                                            new KubernetesStateHandleStore<>(
+                                                    anotherFlinkKubeClient,
+                                                    LEADER_CONFIGMAP_NAME,
+                                                    longStateStorage,
+                                                    filter,
+                                                    LOCK_IDENTITY);
+
+                            store.addAndLock(key, state);
+                            assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(1));
+                            assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0));
+                            assertThat(store.getAllHandles().size(), is(1));
+                            assertThat(store.getAllHandles(), containsInAnyOrder(key));
+                            assertThat(
+                                    store.getAndLock(key).retrieveState().getValue(),
+                                    is(state.getValue()));
+                        });
+            }
+        };
+    }
+
     @Test
     public void testAddFailedWhenConfigMapNotExistAndDiscardState() throws Exception {
         new Context() {
@@ -1119,4 +1166,36 @@ public class KubernetesStateHandleStoreTest extends KubernetesHighAvailabilityTe
         configMap.getData().put(key, deleting);
         return state;
     }
+
+    private static CompletableFuture<Boolean> retryWithFirstFailedK8sOperation(
+            Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> function,
+            KubernetesConfigMap leaderConfigMap) {
+        final AtomicInteger callbackInvocationCount = new AtomicInteger(0);
+        final CompletableFuture<Boolean> result =
+                FutureUtils.retry(
+                        () ->
+                                CompletableFuture.supplyAsync(
+                                        () -> {
+                                            callbackInvocationCount.incrementAndGet();
+                                            function.apply(leaderConfigMap);
+                                            if (callbackInvocationCount.get() == 1) {
+                                                throw new KubernetesClientException(
+                                                        "Expected exception to simulate unstable "
+                                                                + "kubernetes client operation");
+                                            }
+                                            return true;
+                                        },
+                                        Executors.newDirectExecutorService()),
+                        KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES
+                                .defaultValue(),
+                        t ->
+                                ExceptionUtils.findThrowable(t, KubernetesClientException.class)
+                                        .isPresent(),
+                        Executors.newDirectExecutorService());
+        assertThat(callbackInvocationCount.get(), is(2));
+        assertThat(result.isDone(), is(true));
+        assertThat(result.isCompletedExceptionally(), is(false));
+        assertThat(result.isCancelled(), is(false));
+        return result;
+    }
 }