You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/14 12:43:29 UTC

[GitHub] [flink] zentol commented on a change in pull request #17474: [FLINK-18312][WIP]

zentol commented on a change in pull request #17474:
URL: https://github.com/apache/flink/pull/17474#discussion_r728922879



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
##########
@@ -227,19 +267,52 @@ public SavepointStatusHandler(
         }
 
         @Override
+        public CompletableFuture<AsynchronousOperationResult<SavepointInfo>> handleRequest(
+                @Nonnull HandlerRequest<EmptyRequestBody, SavepointStatusMessageParameters> request,
+                @Nonnull RestfulGateway gateway)
+                throws RestHandlerException {
+
+            final AsynchronousJobOperationKey key = getOperationKey(request);
+
+            return gateway.getSavepointStatus(key)
+                    .<AsynchronousOperationResult<SavepointInfo>>thenApply(
+                            (operationResult) -> {
+                                switch (operationResult.getStatus()) {
+                                    case SUCCESS:
+                                        return AsynchronousOperationResult.completed(
+                                                operationResultResponse(
+                                                        operationResult.getResult()));
+                                    case FAILURE:
+                                        return AsynchronousOperationResult.completed(
+                                                exceptionalOperationResultResponse(
+                                                        operationResult.getThrowable()));
+                                    case IN_PROGRESS:
+                                        return AsynchronousOperationResult.inProgress();
+                                    default:
+                                        throw new IllegalStateException(
+                                                "No handler for operation status "
+                                                        + operationResult.getStatus()
+                                                        + ", encountered for key "
+                                                        + key);
+                                }
+                            })
+                    .exceptionally(
+                            (throwable ->
+                                    AsynchronousOperationResult.completed(

Review comment:
       this doesn't seem right; the future could also have failed because there's an RPC timeout.
   
   You don't need to handle this case. We will automatically return a 500 Internal Server Error which is probably the most appropriate error anyway.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
##########
@@ -112,24 +116,46 @@ public SavepointHandlers(@Nullable final String defaultSavepointDir) {
         this.defaultSavepointDir = defaultSavepointDir;
     }
 
-    private abstract class SavepointHandlerBase<T extends RequestBody>
-            extends TriggerHandler<RestfulGateway, T, SavepointTriggerMessageParameters> {
+    private abstract static class SavepointHandlerBase<B extends RequestBody>
+            extends AbstractRestHandler<
+                    RestfulGateway, B, TriggerResponse, SavepointTriggerMessageParameters>
+    {
 
         SavepointHandlerBase(
                 final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
                 final Time timeout,
                 Map<String, String> responseHeaders,
-                final MessageHeaders<T, TriggerResponse, SavepointTriggerMessageParameters>
+                final MessageHeaders<B, TriggerResponse, SavepointTriggerMessageParameters>
                         messageHeaders) {
             super(leaderRetriever, timeout, responseHeaders, messageHeaders);
         }
 
-        @Override
         protected AsynchronousJobOperationKey createOperationKey(
-                final HandlerRequest<T, SavepointTriggerMessageParameters> request) {
+                final HandlerRequest<B, SavepointTriggerMessageParameters> request) {
             final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
             return AsynchronousJobOperationKey.of(new TriggerId(), jobId);
         }
+
+        @Override
+        public void close() {}
+
+        public CompletableFuture<TriggerResponse> handleRequest(
+                @Nonnull HandlerRequest<B, SavepointTriggerMessageParameters> request,
+                @Nonnull RestfulGateway gateway)
+                throws RestHandlerException {
+            final AsynchronousJobOperationKey operationKey = createOperationKey(request);
+
+            triggerOperation(request, operationKey, gateway);
+
+            return CompletableFuture.completedFuture(

Review comment:
       This could be dependent on the triggerOperation future so that issues in the triggering are directly reporter to the user. (which is important because if nothing is actually being triggered then this error just gets lost)

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointTestUtilities.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.savepoints;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.async.OperationResult;
+import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.util.function.TriFunction;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/** A javadoc. */
+public class SavepointTestUtilities {
+    public static TriFunction<JobID, TriggerId, String, CompletableFuture<Acknowledge>>
+            setReferenceToTriggerId(AtomicReference<AsynchronousJobOperationKey> key) {
+        return (JobID jobId, TriggerId triggerId, String directory) -> {
+            key.set(AsynchronousJobOperationKey.of(triggerId, jobId));
+            return null;

Review comment:
       This shouldn't return null.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
##########
@@ -153,16 +170,30 @@
      *     directory should be used
      * @param terminate flag indicating if the job should terminate or just suspend
      * @param timeout for the rpc call
-     * @return Future which is completed with the savepoint path once completed
+     * @return Future which is completed once the operation is triggered successfully
      */
-    default CompletableFuture<String> stopWithSavepoint(
+    default CompletableFuture<Acknowledge> stopWithSavepoint(
             final JobID jobId,
             final String targetDirectory,
             final boolean terminate,
+            TriggerId triggerId,
             @RpcTimeout final Time timeout) {
         throw new UnsupportedOperationException();
     }
 
+    default CompletableFuture<String> stopWithSavepointAndGetLocation(
+            final JobID jobId,
+            final String targetDirectory,
+            final boolean terminate,
+            @RpcTimeout final Time timeout) {
+        throw new UnsupportedOperationException();
+    }
+
+    default CompletableFuture<OperationResult<String>> getSavepointStatus(
+            AsynchronousJobOperationKey operationKey) {

Review comment:
       this should use the same key argument(s) as the trigger methods.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
##########
@@ -153,16 +170,30 @@
      *     directory should be used
      * @param terminate flag indicating if the job should terminate or just suspend
      * @param timeout for the rpc call
-     * @return Future which is completed with the savepoint path once completed
+     * @return Future which is completed once the operation is triggered successfully
      */
-    default CompletableFuture<String> stopWithSavepoint(
+    default CompletableFuture<Acknowledge> stopWithSavepoint(
             final JobID jobId,
             final String targetDirectory,
             final boolean terminate,
+            TriggerId triggerId,
             @RpcTimeout final Time timeout) {
         throw new UnsupportedOperationException();
     }
 
+    default CompletableFuture<String> stopWithSavepointAndGetLocation(

Review comment:
       It would be neat if we could move the methods of this variant out of the RestfulGateway interface, because they are no longer used by the REST API. e.g. into the DispatcherGateway

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
##########
@@ -740,8 +741,8 @@ public Configuration getConfiguration() {
             JobID jobId, String targetDirectory, boolean cancelJob) {
         return runDispatcherCommand(
                 dispatcherGateway ->
-                        dispatcherGateway.triggerSavepoint(
-                                jobId, targetDirectory, cancelJob, rpcTimeout));
+                        dispatcherGateway.triggerSavepointAndGetLocation(
+                                jobId, targetDirectory, cancelJob, new TriggerId(), rpcTimeout));

Review comment:
       ideally this method gets by without the additional triggerId argument.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -683,19 +689,79 @@ JobManagerRunner createJobManagerRunner(JobGraph jobGraph, long initializationTi
                 jobID, gateway -> gateway.triggerCheckpoint(timeout));
     }
 
+    private final CompletedOperationCache<AsynchronousJobOperationKey, String>
+            savepointOperationCache = new CompletedOperationCache<>();
+
     @Override
-    public CompletableFuture<String> triggerSavepoint(
+    // TODO: this needs to return a future which gets complete once the savepoint is actually
+    // TODO: triggered; if complete reutrn 204 ACCEPTED, on error handle them
+    // TODO: / we may need a separate versions of this method for
+    // TODO: the REST API / (job client / minicluster), which _may_ need to wired down to the
+    // TODO: JobMaster
+    public CompletableFuture<Acknowledge> triggerSavepoint(
             final JobID jobId,
             final String targetDirectory,
             final boolean cancelJob,
+            TriggerId operationId,
             final Time timeout) {
 
+        AsynchronousJobOperationKey operationKey =
+                AsynchronousJobOperationKey.of(operationId, jobId);
+        Optional<OperationResult<String>> existingTriggerResultOptional =
+                savepointOperationCache.get(operationKey);
+
+        if (existingTriggerResultOptional.isEmpty()) {

Review comment:
       To not blow up the Dispatcher more than it already is we should try to encapsulate as much logic as possible in a new component of the Dispatcher that contains the caches (which we will extend in the future after all) and all of the logic of handling already existing operations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org