You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2022/08/24 07:41:58 UTC
[flink] branch master updated: [FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent
This is an automated email from the ASF dual-hosted git repository.
wangyang0918 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new aae96d0c9d1 [FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent
aae96d0c9d1 is described below
commit aae96d0c9d1768c396bdf2ee6510677fbb8f317a
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Mon Aug 15 23:03:19 2022 +0800
[FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent
This closes #20590.
---
.../KubernetesStateHandleStore.java | 16 +++--
.../KubernetesStateHandleStoreTest.java | 75 ++++++++++++++++++++++
2 files changed, 87 insertions(+), 4 deletions(-)
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
index 0716b58ec34..42f07003207 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
@@ -648,10 +648,12 @@ public class KubernetesStateHandleStore<T extends Serializable>
private Optional<KubernetesConfigMap> addEntry(
KubernetesConfigMap configMap, String key, byte[] serializedStateHandle)
throws Exception {
- final String content = configMap.getData().get(key);
- if (content != null) {
+ final String oldBase64Content = configMap.getData().get(key);
+ final String newBase64Content = toBase64(serializedStateHandle);
+ if (oldBase64Content != null) {
try {
- final StateHandleWithDeleteMarker<T> stateHandle = deserializeStateHandle(content);
+ final StateHandleWithDeleteMarker<T> stateHandle =
+ deserializeStateHandle(oldBase64Content);
if (stateHandle.isMarkedForDeletion()) {
// This might be a left-over after the fail-over. As the remove operation is
// idempotent let's try to finish it.
@@ -660,6 +662,12 @@ public class KubernetesStateHandleStore<T extends Serializable>
"Unable to remove the marked as deleting entry.");
}
} else {
+ // It could happen that the kubernetes client retries a transaction that has
+ // already succeeded due to network issues. So we simply ignore when the
+ // new content is same as the existing one.
+ if (oldBase64Content.equals(newBase64Content)) {
+ return Optional.of(configMap);
+ }
throw getKeyAlreadyExistException(key);
}
} catch (IOException e) {
@@ -668,7 +676,7 @@ public class KubernetesStateHandleStore<T extends Serializable>
logInvalidEntry(key, configMapName, e);
}
}
- configMap.getData().put(key, toBase64(serializedStateHandle));
+ configMap.getData().put(key, newBase64Content);
return Optional.of(configMap);
}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
index e246dda45a9..2d58f93a8fe 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.kubernetes.highavailability;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.StateHandleWithDeleteMarker;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
@@ -27,10 +28,13 @@ import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.persistence.StringResourceVersion;
import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.FunctionUtils;
+import io.fabric8.kubernetes.client.KubernetesClientException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -39,6 +43,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import java.util.function.Predicate;
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
@@ -222,6 +230,45 @@ class KubernetesStateHandleStoreTest extends KubernetesHighAvailabilityTestBase
};
}
+ @Test
+ void testAddAndLockShouldNotThrowAlreadyExistExceptionWithSameContents() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final FlinkKubeClient anotherFlinkKubeClient =
+ createFlinkKubeClientBuilder()
+ .setCheckAndUpdateConfigMapFunction(
+ (configMapName, function) ->
+ retryWithFirstFailedK8sOperation(
+ function, getLeaderConfigMap()))
+ .build();
+ final KubernetesStateHandleStore<
+ TestingLongStateHandleHelper.LongStateHandle>
+ store =
+ new KubernetesStateHandleStore<>(
+ anotherFlinkKubeClient,
+ LEADER_CONFIGMAP_NAME,
+ longStateStorage,
+ filter,
+ LOCK_IDENTITY);
+
+ store.addAndLock(key, state);
+ assertThat(TestingLongStateHandleHelper.getGlobalStorageSize())
+ .isEqualTo(1);
+ assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount())
+ .isEqualTo(0);
+ assertThat(store.getAllHandles()).hasSize(1);
+ assertThat(store.getAllHandles()).contains(key);
+ assertThat(store.getAndLock(key).retrieveState().getValue())
+ .isEqualTo(state.getValue());
+ });
+ }
+ };
+ }
+
@Test
void testAddFailedWhenConfigMapNotExistAndDiscardState() throws Exception {
new Context() {
@@ -1143,4 +1190,32 @@ class KubernetesStateHandleStoreTest extends KubernetesHighAvailabilityTestBase
configMap.getData().put(key, deleting);
return state;
}
+
+ private static CompletableFuture<Boolean> retryWithFirstFailedK8sOperation(
+ Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> function,
+ KubernetesConfigMap leaderConfigMap) {
+ final AtomicInteger callbackInvocationCount = new AtomicInteger(0);
+ final CompletableFuture<Boolean> result =
+ FutureUtils.retry(
+ () ->
+ CompletableFuture.supplyAsync(
+ () -> {
+ callbackInvocationCount.incrementAndGet();
+ function.apply(leaderConfigMap);
+ if (callbackInvocationCount.get() == 1) {
+ throw new KubernetesClientException(
+ "Expected exception to simulate unstable "
+ + "kubernetes client operation");
+ }
+ return true;
+ }),
+ KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES
+ .defaultValue(),
+ t ->
+ ExceptionUtils.findThrowable(t, KubernetesClientException.class)
+ .isPresent(),
+ Executors.newDirectExecutorService());
+ assertThat(callbackInvocationCount.get()).isEqualTo(2);
+ return result;
+ }
}