You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/19 14:34:48 UTC

[GitHub] [flink] zentol commented on a change in pull request #17474: [FLINK-18312] Add caching for savepoint operations to Dispatcher

zentol commented on a change in pull request #17474:
URL: https://github.com/apache/flink/pull/17474#discussion_r731914875



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
##########
@@ -103,33 +106,56 @@
  * }
  * </pre>
  */
-public class SavepointHandlers
-        extends AbstractAsynchronousOperationHandlers<AsynchronousJobOperationKey, String> {
+public class SavepointHandlers {
 
     @Nullable private final String defaultSavepointDir;
 
     public SavepointHandlers(@Nullable final String defaultSavepointDir) {
         this.defaultSavepointDir = defaultSavepointDir;
     }
 
-    private abstract class SavepointHandlerBase<T extends RequestBody>
-            extends TriggerHandler<RestfulGateway, T, SavepointTriggerMessageParameters> {
+    private abstract static class SavepointHandlerBase<B extends RequestBody>
+            extends AbstractRestHandler<
+                    RestfulGateway, B, TriggerResponse, SavepointTriggerMessageParameters> {
 
         SavepointHandlerBase(
                 final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
                 final Time timeout,
                 Map<String, String> responseHeaders,
-                final MessageHeaders<T, TriggerResponse, SavepointTriggerMessageParameters>
+                final MessageHeaders<B, TriggerResponse, SavepointTriggerMessageParameters>
                         messageHeaders) {
             super(leaderRetriever, timeout, responseHeaders, messageHeaders);
         }
 
-        @Override
         protected AsynchronousJobOperationKey createOperationKey(
-                final HandlerRequest<T, SavepointTriggerMessageParameters> request) {
+                final HandlerRequest<B, SavepointTriggerMessageParameters> request) {
             final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
             return AsynchronousJobOperationKey.of(new TriggerId(), jobId);
         }
+
+        @Override
+        public void close() {}

Review comment:
       you don't need to override `close`

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
+import org.apache.flink.runtime.rest.handler.async.OperationResult;
+import org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException;
+import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.Is.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.fail;
+
+/** Tests for the {@link DispatcherCachedOperationsHandler} component. */
+public class DispatcherCachedOperationsHandlerTest extends TestLogger {
+
+    private CompletedOperationCache<AsynchronousJobOperationKey, String> cache;
+    private DispatcherCachedOperationsHandler handler;
+
+    private SpyFunction<TriggerSavepointParameters, CompletableFuture<String>>
+            triggerSavepointFunction;
+    private SpyFunction<TriggerSavepointParameters, CompletableFuture<String>>
+            stopWithSavepointFunction;
+
+    private CompletableFuture<String> savepointLocationFuture = new CompletableFuture<>();
+    private final TriggerSavepointParameters savepointTriggerParameters =
+            new TriggerSavepointParameters(new JobID(), "dummyDirectory", false, Time.minutes(1));
+    private AsynchronousJobOperationKey operationKey;
+
+    @Before
+    public void setup() {
+        savepointLocationFuture = new CompletableFuture<>();
+        triggerSavepointFunction = SpyFunction.wrap(parameters -> savepointLocationFuture);
+        stopWithSavepointFunction = SpyFunction.wrap(parameters -> savepointLocationFuture);
+        cache = new CompletedOperationCache<>();
+        handler =
+                new DispatcherCachedOperationsHandler(
+                        triggerSavepointFunction, stopWithSavepointFunction, cache);
+        operationKey =
+                AsynchronousJobOperationKey.of(
+                        new TriggerId(), savepointTriggerParameters.getJobID());
+    }
+
+    @Test
+    public void triggerSavepointRepeatedly() throws ExecutionException, InterruptedException {
+        CompletableFuture<Acknowledge> firstAcknowledge =
+                handler.triggerSavepoint(operationKey, savepointTriggerParameters);
+        CompletableFuture<Acknowledge> secondAcknowledge =
+                handler.triggerSavepoint(operationKey, savepointTriggerParameters);
+
+        assertThat(triggerSavepointFunction.getNumberOfInvocations(), is(1));
+        assertThat(
+                triggerSavepointFunction.getInvocationParameters().get(0),
+                is(savepointTriggerParameters));
+
+        assertThat(firstAcknowledge.get(), is(Acknowledge.get()));
+        assertThat(secondAcknowledge.get(), is(Acknowledge.get()));
+    }
+
+    @Test
+    public void stopWithSavepointRepeatedly() throws ExecutionException, InterruptedException {
+        CompletableFuture<Acknowledge> firstAcknowledge =
+                handler.stopWithSavepoint(operationKey, savepointTriggerParameters);
+        CompletableFuture<Acknowledge> secondAcknowledge =
+                handler.stopWithSavepoint(operationKey, savepointTriggerParameters);
+
+        assertThat(stopWithSavepointFunction.getNumberOfInvocations(), is(1));
+        assertThat(
+                stopWithSavepointFunction.getInvocationParameters().get(0),
+                is(savepointTriggerParameters));
+
+        assertThat(firstAcknowledge.get(), is(Acknowledge.get()));
+        assertThat(secondAcknowledge.get(), is(Acknowledge.get()));
+    }
+
+    @Test
+    public void returnsFailedFutureIfOperationFails()
+            throws ExecutionException, InterruptedException {
+        CompletableFuture<Acknowledge> acknowledgeRegisteredOperation =
+                handler.triggerSavepoint(operationKey, savepointTriggerParameters);
+        savepointLocationFuture.completeExceptionally(new RuntimeException("Expected failure"));
+        CompletableFuture<Acknowledge> failedAcknowledgeFuture =
+                handler.triggerSavepoint(operationKey, savepointTriggerParameters);
+
+        assertThat(acknowledgeRegisteredOperation.get(), is(Acknowledge.get()));
+        assertThrows(ExecutionException.class, failedAcknowledgeFuture::get);
+    }
+
+    @Test
+    public void returnsFailedFutureIfCacheIsShuttingDown() throws InterruptedException {
+        cache.closeAsync();
+        CompletableFuture<Acknowledge> returnedFuture =
+                handler.triggerSavepoint(operationKey, savepointTriggerParameters);
+
+        try {
+            returnedFuture.get();

Review comment:
       Use `FlinkMatchers#futureWillCompleteExceptionally` instead

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerSavepointParameters.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class TriggerSavepointParameters {

Review comment:
       Why did you go with this instead of a `Trigger[StopWith]SavepointFunction` interface(s)?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
+import org.apache.flink.runtime.rest.handler.async.OperationResult;
+import org.apache.flink.runtime.rest.handler.async.OperationResultStatus;
+import org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException;
+import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * A handler for async operations triggered by the {@link Dispatcher} whose trigger parameters and
+ * results are cached.
+ */
+public class DispatcherCachedOperationsHandler {
+
+    private final CompletedOperationCache<AsynchronousJobOperationKey, String>
+            savepointTriggerCache;
+
+    private final Function<TriggerSavepointParameters, CompletableFuture<String>>
+            triggerSavepointFunction;
+
+    private final Function<TriggerSavepointParameters, CompletableFuture<String>>
+            stopWithSavepointFunction;
+
+    DispatcherCachedOperationsHandler(
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    triggerSavepointFunction,
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    stopWithSavepointFunction) {
+        this(triggerSavepointFunction, stopWithSavepointFunction, new CompletedOperationCache<>());
+    }
+
+    DispatcherCachedOperationsHandler(
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    triggerSavepointFunction,
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    stopWithSavepointFunction,
+            CompletedOperationCache<AsynchronousJobOperationKey, String> savepointTriggerCache) {
+        this.triggerSavepointFunction = triggerSavepointFunction;
+        this.stopWithSavepointFunction = stopWithSavepointFunction;
+        this.savepointTriggerCache = savepointTriggerCache;
+    }
+
+    public CompletableFuture<Acknowledge> triggerSavepoint(
+            AsynchronousJobOperationKey operationKey, TriggerSavepointParameters parameters) {
+        return registerOperationIdempotently(operationKey, triggerSavepointFunction, parameters);
+    }
+
+    public CompletableFuture<Acknowledge> stopWithSavepoint(
+            AsynchronousJobOperationKey operationKey, TriggerSavepointParameters parameters) {
+        return registerOperationIdempotently(operationKey, stopWithSavepointFunction, parameters);
+    }
+
+    public CompletableFuture<OperationResult<String>> getSavepointStatus(
+            AsynchronousJobOperationKey operationKey) {
+        return savepointTriggerCache
+                .get(operationKey)
+                .map(CompletableFuture::completedFuture)
+                .orElse(
+                        CompletableFuture.failedFuture(
+                                new UnknownOperationKeyException(operationKey)));
+    }
+
+    public CompletableFuture<Void> shutDownCache() {
+        return savepointTriggerCache.closeAsync();
+    }
+
+    private <P> CompletableFuture<Acknowledge> registerOperationIdempotently(
+            AsynchronousJobOperationKey operationKey,
+            Function<P, CompletableFuture<String>> operation,
+            P parameters) {
+        Optional<OperationResult<String>> resultOptional = savepointTriggerCache.get(operationKey);
+        if (resultOptional.isPresent()) {
+            return convertToFuture(resultOptional.get());
+        }
+
+        try {
+            savepointTriggerCache.registerOngoingOperation(
+                    operationKey, operation.apply(parameters));
+        } catch (IllegalStateException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+
+        return savepointTriggerCache
+                .get(operationKey)
+                .map(this::convertToFuture)
+                .orElseThrow(
+                        () ->
+                                new IllegalStateException(
+                                        "Internal cache error: Failed to retrieve status"));

Review comment:
       We should be able to just return an Acknowledge here. We already covered the case of pre-existing operations at the top, and thus now that the operation that we just added is still ongoing.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
+import org.apache.flink.runtime.rest.handler.async.OperationResult;
+import org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException;
+import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.Is.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.fail;
+
+/** Tests for the {@link DispatcherCachedOperationsHandler} component. */
+public class DispatcherCachedOperationsHandlerTest extends TestLogger {
+
+    private CompletedOperationCache<AsynchronousJobOperationKey, String> cache;
+    private DispatcherCachedOperationsHandler handler;
+
+    private SpyFunction<TriggerSavepointParameters, CompletableFuture<String>>
+            triggerSavepointFunction;
+    private SpyFunction<TriggerSavepointParameters, CompletableFuture<String>>
+            stopWithSavepointFunction;
+
+    private CompletableFuture<String> savepointLocationFuture = new CompletableFuture<>();
+    private final TriggerSavepointParameters savepointTriggerParameters =
+            new TriggerSavepointParameters(new JobID(), "dummyDirectory", false, Time.minutes(1));
+    private AsynchronousJobOperationKey operationKey;
+
+    @Before
+    public void setup() {
+        savepointLocationFuture = new CompletableFuture<>();
+        triggerSavepointFunction = SpyFunction.wrap(parameters -> savepointLocationFuture);
+        stopWithSavepointFunction = SpyFunction.wrap(parameters -> savepointLocationFuture);
+        cache = new CompletedOperationCache<>();
+        handler =
+                new DispatcherCachedOperationsHandler(
+                        triggerSavepointFunction, stopWithSavepointFunction, cache);
+        operationKey =
+                AsynchronousJobOperationKey.of(
+                        new TriggerId(), savepointTriggerParameters.getJobID());
+    }
+
+    @Test
+    public void triggerSavepointRepeatedly() throws ExecutionException, InterruptedException {
+        CompletableFuture<Acknowledge> firstAcknowledge =
+                handler.triggerSavepoint(operationKey, savepointTriggerParameters);
+        CompletableFuture<Acknowledge> secondAcknowledge =
+                handler.triggerSavepoint(operationKey, savepointTriggerParameters);
+
+        assertThat(triggerSavepointFunction.getNumberOfInvocations(), is(1));
+        assertThat(
+                triggerSavepointFunction.getInvocationParameters().get(0),
+                is(savepointTriggerParameters));
+
+        assertThat(firstAcknowledge.get(), is(Acknowledge.get()));
+        assertThat(secondAcknowledge.get(), is(Acknowledge.get()));
+    }
+
+    @Test
+    public void stopWithSavepointRepeatedly() throws ExecutionException, InterruptedException {
+        CompletableFuture<Acknowledge> firstAcknowledge =
+                handler.stopWithSavepoint(operationKey, savepointTriggerParameters);
+        CompletableFuture<Acknowledge> secondAcknowledge =
+                handler.stopWithSavepoint(operationKey, savepointTriggerParameters);
+
+        assertThat(stopWithSavepointFunction.getNumberOfInvocations(), is(1));
+        assertThat(
+                stopWithSavepointFunction.getInvocationParameters().get(0),
+                is(savepointTriggerParameters));
+
+        assertThat(firstAcknowledge.get(), is(Acknowledge.get()));
+        assertThat(secondAcknowledge.get(), is(Acknowledge.get()));
+    }
+
+    @Test
+    public void returnsFailedFutureIfOperationFails()
+            throws ExecutionException, InterruptedException {
+        CompletableFuture<Acknowledge> acknowledgeRegisteredOperation =
+                handler.triggerSavepoint(operationKey, savepointTriggerParameters);
+        savepointLocationFuture.completeExceptionally(new RuntimeException("Expected failure"));
+        CompletableFuture<Acknowledge> failedAcknowledgeFuture =
+                handler.triggerSavepoint(operationKey, savepointTriggerParameters);
+
+        assertThat(acknowledgeRegisteredOperation.get(), is(Acknowledge.get()));
+        assertThrows(ExecutionException.class, failedAcknowledgeFuture::get);
+    }
+
+    @Test
+    public void returnsFailedFutureIfCacheIsShuttingDown() throws InterruptedException {
+        cache.closeAsync();
+        CompletableFuture<Acknowledge> returnedFuture =
+                handler.triggerSavepoint(operationKey, savepointTriggerParameters);
+
+        try {
+            returnedFuture.get();
+            fail("Future should have completed exceptionally");
+        } catch (ExecutionException e) {
+            assertThat((IllegalStateException) e.getCause(), isA(IllegalStateException.class));
+        }
+    }
+
+    @Test
+    public void getStatus() throws ExecutionException, InterruptedException {
+        handler.triggerSavepoint(operationKey, savepointTriggerParameters);
+
+        String savepointLocation = "location";
+        savepointLocationFuture.complete(savepointLocation);
+
+        CompletableFuture<OperationResult<String>> statusFuture =
+                handler.getSavepointStatus(operationKey);
+
+        assertEquals(statusFuture.get(), OperationResult.success(savepointLocation));
+    }
+
+    @Test
+    public void getStatusFailsIfKeyUnknown() throws InterruptedException {
+        CompletableFuture<OperationResult<String>> statusFuture =
+                handler.getSavepointStatus(operationKey);
+
+        try {
+            statusFuture.get();
+            fail("Retrieving the status should have failed");
+        } catch (ExecutionException e) {

Review comment:
       same as above

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
##########
@@ -194,11 +195,12 @@ public void testUnknownTriggerId() throws Exception {
      */
     @Test
     public void testCloseShouldFinishOnFirstServedResult() throws Exception {
-        final CompletableFuture<String> savepointFuture = new CompletableFuture<>();
+        final CompletableFuture<Acknowledge> acknowledgeFuture = new CompletableFuture<>();
         final TestingRestfulGateway testingRestfulGateway =
                 new TestingRestfulGateway.Builder()
                         .setTriggerSavepointFunction(
-                                (JobID jobId, String directory) -> savepointFuture)
+                                (AsynchronousJobOperationKey operationKey, String directory) ->

Review comment:
       I'm wondering if we shouldn't use a different method (bit weird that we're using the one method that isn't used by he AsyncOperationHandlers), or even completely isolate this test from the RestfulGateway (because we really shouldn't have to change anything here). 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
+import org.apache.flink.runtime.rest.handler.async.OperationResult;
+import org.apache.flink.runtime.rest.handler.async.OperationResultStatus;
+import org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException;
+import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * A handler for async operations triggered by the {@link Dispatcher} whose trigger parameters and
+ * results are cached.
+ */
+public class DispatcherCachedOperationsHandler {
+
+    private final CompletedOperationCache<AsynchronousJobOperationKey, String>
+            savepointTriggerCache;
+
+    private final Function<TriggerSavepointParameters, CompletableFuture<String>>
+            triggerSavepointFunction;
+
+    private final Function<TriggerSavepointParameters, CompletableFuture<String>>
+            stopWithSavepointFunction;
+
+    DispatcherCachedOperationsHandler(
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    triggerSavepointFunction,
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    stopWithSavepointFunction) {
+        this(triggerSavepointFunction, stopWithSavepointFunction, new CompletedOperationCache<>());
+    }
+
+    DispatcherCachedOperationsHandler(
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    triggerSavepointFunction,
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    stopWithSavepointFunction,
+            CompletedOperationCache<AsynchronousJobOperationKey, String> savepointTriggerCache) {
+        this.triggerSavepointFunction = triggerSavepointFunction;
+        this.stopWithSavepointFunction = stopWithSavepointFunction;
+        this.savepointTriggerCache = savepointTriggerCache;
+    }
+
+    public CompletableFuture<Acknowledge> triggerSavepoint(
+            AsynchronousJobOperationKey operationKey, TriggerSavepointParameters parameters) {
+        return registerOperationIdempotently(operationKey, triggerSavepointFunction, parameters);
+    }
+
+    public CompletableFuture<Acknowledge> stopWithSavepoint(
+            AsynchronousJobOperationKey operationKey, TriggerSavepointParameters parameters) {
+        return registerOperationIdempotently(operationKey, stopWithSavepointFunction, parameters);
+    }
+
+    public CompletableFuture<OperationResult<String>> getSavepointStatus(
+            AsynchronousJobOperationKey operationKey) {
+        return savepointTriggerCache
+                .get(operationKey)
+                .map(CompletableFuture::completedFuture)
+                .orElse(
+                        CompletableFuture.failedFuture(
+                                new UnknownOperationKeyException(operationKey)));
+    }
+
+    public CompletableFuture<Void> shutDownCache() {
+        return savepointTriggerCache.closeAsync();
+    }
+
+    private <P> CompletableFuture<Acknowledge> registerOperationIdempotently(
+            AsynchronousJobOperationKey operationKey,
+            Function<P, CompletableFuture<String>> operation,
+            P parameters) {
+        Optional<OperationResult<String>> resultOptional = savepointTriggerCache.get(operationKey);
+        if (resultOptional.isPresent()) {
+            return convertToFuture(resultOptional.get());
+        }
+
+        try {
+            savepointTriggerCache.registerOngoingOperation(
+                    operationKey, operation.apply(parameters));
+        } catch (IllegalStateException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+
+        return savepointTriggerCache
+                .get(operationKey)
+                .map(this::convertToFuture)
+                .orElseThrow(
+                        () ->
+                                new IllegalStateException(
+                                        "Internal cache error: Failed to retrieve status"));
+    }
+
+    private CompletableFuture<Acknowledge> convertToFuture(OperationResult<String> result) {
+        if (result.getStatus() == OperationResultStatus.FAILURE) {
+            return CompletableFuture.failedFuture(result.getThrowable());

Review comment:
       Let's wrap the Throwable in something like a `OperationAlreadyFailedException`, so that the handler scan differentiate between this kind of failure and some request validation error in the `Dispatcher`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
+import org.apache.flink.runtime.rest.handler.async.OperationResult;
+import org.apache.flink.runtime.rest.handler.async.OperationResultStatus;
+import org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException;
+import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * A handler for async operations triggered by the {@link Dispatcher} whose trigger parameters and
+ * results are cached.
+ */
+public class DispatcherCachedOperationsHandler {
+
+    private final CompletedOperationCache<AsynchronousJobOperationKey, String>
+            savepointTriggerCache;
+
+    private final Function<TriggerSavepointParameters, CompletableFuture<String>>
+            triggerSavepointFunction;
+
+    private final Function<TriggerSavepointParameters, CompletableFuture<String>>
+            stopWithSavepointFunction;
+
+    DispatcherCachedOperationsHandler(
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    triggerSavepointFunction,
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    stopWithSavepointFunction) {
+        this(triggerSavepointFunction, stopWithSavepointFunction, new CompletedOperationCache<>());
+    }
+
+    DispatcherCachedOperationsHandler(
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    triggerSavepointFunction,
+            Function<TriggerSavepointParameters, CompletableFuture<String>>
+                    stopWithSavepointFunction,
+            CompletedOperationCache<AsynchronousJobOperationKey, String> savepointTriggerCache) {
+        this.triggerSavepointFunction = triggerSavepointFunction;
+        this.stopWithSavepointFunction = stopWithSavepointFunction;
+        this.savepointTriggerCache = savepointTriggerCache;
+    }
+
+    public CompletableFuture<Acknowledge> triggerSavepoint(
+            AsynchronousJobOperationKey operationKey, TriggerSavepointParameters parameters) {
+        return registerOperationIdempotently(operationKey, triggerSavepointFunction, parameters);
+    }
+
+    public CompletableFuture<Acknowledge> stopWithSavepoint(
+            AsynchronousJobOperationKey operationKey, TriggerSavepointParameters parameters) {
+        return registerOperationIdempotently(operationKey, stopWithSavepointFunction, parameters);
+    }
+
+    public CompletableFuture<OperationResult<String>> getSavepointStatus(
+            AsynchronousJobOperationKey operationKey) {
+        return savepointTriggerCache
+                .get(operationKey)
+                .map(CompletableFuture::completedFuture)
+                .orElse(
+                        CompletableFuture.failedFuture(
+                                new UnknownOperationKeyException(operationKey)));
+    }
+
+    public CompletableFuture<Void> shutDownCache() {
+        return savepointTriggerCache.closeAsync();
+    }
+
+    private <P> CompletableFuture<Acknowledge> registerOperationIdempotently(
+            AsynchronousJobOperationKey operationKey,
+            Function<P, CompletableFuture<String>> operation,
+            P parameters) {
+        Optional<OperationResult<String>> resultOptional = savepointTriggerCache.get(operationKey);
+        if (resultOptional.isPresent()) {
+            return convertToFuture(resultOptional.get());
+        }
+
+        try {
+            savepointTriggerCache.registerOngoingOperation(
+                    operationKey, operation.apply(parameters));
+        } catch (IllegalStateException e) {
+            return CompletableFuture.failedFuture(e);

Review comment:
       There's no need to catch this exception; it just makes the code more complicated without any real benefit.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
+import org.apache.flink.runtime.rest.handler.async.OperationResult;
+import org.apache.flink.runtime.rest.handler.async.OperationResultStatus;
+import org.apache.flink.runtime.rest.handler.async.UnknownOperationKeyException;
+import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * A handler for async operations triggered by the {@link Dispatcher} whose trigger parameters and

Review comment:
       where are we caching the trigger parameters?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointTestUtilities.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.savepoints;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.async.OperationResult;
+import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/** Utility functions used in tests. */
+public class SavepointTestUtilities {
+    public static BiFunction<AsynchronousJobOperationKey, String, CompletableFuture<Acknowledge>>

Review comment:
       These need some javadocs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org