You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/18 13:33:26 UTC

[GitHub] [flink] Myracle opened a new pull request, #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

Myracle opened a new pull request, #21347:
URL: https://github.com/apache/flink/pull/21347

   ## What is the purpose of the change
   
   *Enhance the behavior by building the heartbeat between the client and the job. Once the job can not received any heartbeat from the client, the job cancel itself. *
   
   
   ## Brief change log
   
     - *Client sends heartbeat to the dispatcher periodically.*
     - *If the job in the dispatcher haven't received the heartbeat after timeout, it cancels itself.*
     - "The interval and timeout configs reuse the heartbeat's configs"
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21347:
URL: https://github.com/apache/flink/pull/21347#issuecomment-1320008320

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a38d88888aad5de4bc47af77a3b33506e267286b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a38d88888aad5de4bc47af77a3b33506e267286b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a38d88888aad5de4bc47af77a3b33506e267286b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

Posted by GitBox <gi...@apache.org>.
Myracle commented on PR #21347:
URL: https://github.com/apache/flink/pull/21347#issuecomment-1370591649

   @xintongsong Have a look? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #21347:
URL: https://github.com/apache/flink/pull/21347#discussion_r1064320554


##########
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java:
##########
@@ -1756,6 +1756,9 @@ public final class ConfigConstants {
     /** The user lib directory name. */
     public static final String DEFAULT_FLINK_USR_LIB_DIR = "usrlib";
 
+    /** The initial client timeout when submitting the job. */
+    public static final String INITIAL_CLIENT_HEARTBEAT_TIMEOUT = "initialClientHeartbeatTimeout";

Review Comment:
   `ConfigConstants` is public api annotated with `@Public`. We should not add this internal config key here.
   
   I think it would be good enough to introduce the config key as a constant in `JobGraph`.



##########
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java:
##########
@@ -206,4 +206,14 @@ default CompletableFuture<Set<AbstractID>> listCompletedClusterDatasetIds() {
     default CompletableFuture<Void> invalidateClusterDataset(AbstractID clusterDatasetId) {
         return CompletableFuture.completedFuture(null);
     }
+
+    /**
+     * The client reports the heartbeat to the dispatcher for aliveness.
+     *
+     * @param jobId The jobId for the client and the job.
+     * @return
+     */
+    default CompletableFuture<Void> reportHeartbeat(JobID jobId, long expiredTimestamp) {
+        return CompletableFuture.completedFuture(null);

Review Comment:
   ```suggestion
           return FutureUtils.completedVoidFuture();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #21347:
URL: https://github.com/apache/flink/pull/21347#discussion_r1036742632


##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java:
##########
@@ -271,4 +271,9 @@ default CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoor
             @RpcTimeout Time timeout) {
         throw new UnsupportedOperationException();
     }
+
+    /** The client reports the heartbeat to the dispatcher for aliveness. */
+    default CompletableFuture<Void> reportJobClientHeartbeat(JobID jobId, Time timeout) {
+        return CompletableFuture.completedFuture(null);

Review Comment:
   Use `FutureUtils.completedVoidFuture()` to reduce object creation.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1016,6 +1034,47 @@ public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoord
                                 operatorId, serializedRequest, timeout));
     }
 
+    @Override
+    public CompletableFuture<Void> reportJobClientHeartbeat(JobID jobId, Time timeout) {
+        if (!getJobManagerRunner(jobId).isPresent()) {
+            log.warn("Fail to find job {} for client.", jobId);
+        } else {
+            log.debug("Job {} receives client's heartbeat.", jobId);
+            markJobClientAliveness(jobId);
+            resetJobClientAlivenessCheck(jobId);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private void markJobClientAliveness(JobID jobId) {
+        if (jobClientAlivenessFutures.containsKey(jobId)) {
+            jobClientAlivenessFutures.get(jobId).complete(null);
+        }
+    }
+
+    private void resetJobClientAlivenessCheck(JobID jobId) {
+        CompletableFuture<Void> clientAlivenessFuture = new CompletableFuture<>();
+        jobClientAlivenessFutures.put(jobId, clientAlivenessFuture);
+        FutureUtils.orTimeout(
+                clientAlivenessFuture,
+                jobClientHeartbeatTimeout,
+                TimeUnit.MILLISECONDS,
+                getMainThreadExecutor());
+        clientAlivenessFuture.whenComplete(
+                (t, throwable) -> {
+                    if (throwable != null) {
+                        if (throwable instanceof TimeoutException) {
+                            log.warn(
+                                    "Haven't receive aliveness from job client and cancel the job {}.",
+                                    jobId);
+                            cancelJob(jobId, webTimeout);

Review Comment:
   The future based approach seems to be an overkill for this feature. For each job and on each heartbeat, it creates a new feature, which seems to be expensive.
   
   I'd suggest to simply record the last heartbeat timestamp of each job, and periodically check whether there's any job that has timed out. This is similar to how we check and release idle TMs. It may not trigger a timeout precisely on the configured time, but would be much cheaper.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1016,6 +1034,47 @@ public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoord
                                 operatorId, serializedRequest, timeout));
     }
 
+    @Override
+    public CompletableFuture<Void> reportJobClientHeartbeat(JobID jobId, Time timeout) {
+        if (!getJobManagerRunner(jobId).isPresent()) {
+            log.warn("Fail to find job {} for client.", jobId);
+        } else {
+            log.debug("Job {} receives client's heartbeat.", jobId);
+            markJobClientAliveness(jobId);
+            resetJobClientAlivenessCheck(jobId);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private void markJobClientAliveness(JobID jobId) {
+        if (jobClientAlivenessFutures.containsKey(jobId)) {
+            jobClientAlivenessFutures.get(jobId).complete(null);
+        }
+    }
+
+    private void resetJobClientAlivenessCheck(JobID jobId) {
+        CompletableFuture<Void> clientAlivenessFuture = new CompletableFuture<>();
+        jobClientAlivenessFutures.put(jobId, clientAlivenessFuture);
+        FutureUtils.orTimeout(
+                clientAlivenessFuture,
+                jobClientHeartbeatTimeout,
+                TimeUnit.MILLISECONDS,
+                getMainThreadExecutor());
+        clientAlivenessFuture.whenComplete(
+                (t, throwable) -> {
+                    if (throwable != null) {
+                        if (throwable instanceof TimeoutException) {
+                            log.warn(
+                                    "Haven't receive aliveness from job client and cancel the job {}.",
+                                    jobId);
+                            cancelJob(jobId, webTimeout);

Review Comment:
   What happens if there's something wrong in canceling the job? E.g., a timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #21347:
URL: https://github.com/apache/flink/pull/21347#discussion_r1053915899


##########
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java:
##########
@@ -1344,6 +1344,13 @@ public CompletableFuture<Set<AbstractID>> listCompletedClusterDatasetIds() {
                                         metaInfoMap -> new HashSet<>(metaInfoMap.keySet())));
     }
 
+    public CompletableFuture<Void> reportHeartbeat(JobID jobId, long expiredTimestamp) {
+        return runDispatcherCommand(
+                dispatcherGateway ->
+                        dispatcherGateway.reportJobClientHeartbeat(
+                                jobId, expiredTimestamp, rpcTimeout));
+    }

Review Comment:
   I'm fine with keep it if that helps the testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

Posted by GitBox <gi...@apache.org>.
Myracle commented on PR #21347:
URL: https://github.com/apache/flink/pull/21347#issuecomment-1370384506

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on a diff in pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

Posted by GitBox <gi...@apache.org>.
Myracle commented on code in PR #21347:
URL: https://github.com/apache/flink/pull/21347#discussion_r1039490326


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1016,6 +1034,47 @@ public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoord
                                 operatorId, serializedRequest, timeout));
     }
 
+    @Override
+    public CompletableFuture<Void> reportJobClientHeartbeat(JobID jobId, Time timeout) {
+        if (!getJobManagerRunner(jobId).isPresent()) {
+            log.warn("Fail to find job {} for client.", jobId);
+        } else {
+            log.debug("Job {} receives client's heartbeat.", jobId);
+            markJobClientAliveness(jobId);
+            resetJobClientAlivenessCheck(jobId);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private void markJobClientAliveness(JobID jobId) {
+        if (jobClientAlivenessFutures.containsKey(jobId)) {
+            jobClientAlivenessFutures.get(jobId).complete(null);
+        }
+    }
+
+    private void resetJobClientAlivenessCheck(JobID jobId) {
+        CompletableFuture<Void> clientAlivenessFuture = new CompletableFuture<>();
+        jobClientAlivenessFutures.put(jobId, clientAlivenessFuture);
+        FutureUtils.orTimeout(
+                clientAlivenessFuture,
+                jobClientHeartbeatTimeout,
+                TimeUnit.MILLISECONDS,
+                getMainThreadExecutor());
+        clientAlivenessFuture.whenComplete(
+                (t, throwable) -> {
+                    if (throwable != null) {
+                        if (throwable instanceof TimeoutException) {
+                            log.warn(
+                                    "Haven't receive aliveness from job client and cancel the job {}.",
+                                    jobId);
+                            cancelJob(jobId, webTimeout);

Review Comment:
   @xintongsong Agree with you and fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on a diff in pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

Posted by GitBox <gi...@apache.org>.
Myracle commented on code in PR #21347:
URL: https://github.com/apache/flink/pull/21347#discussion_r1053272512


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobClientHeartbeatHandler.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatParameters;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Receive the heartbeat from the client. */
+public class JobClientHeartbeatHandler
+        extends AbstractRestHandler<
+                RestfulGateway,
+                JobClientHeartbeatRequestBody,
+                EmptyResponseBody,
+                JobClientHeartbeatParameters> {
+    public JobClientHeartbeatHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Time timeout,
+            Map<String, String> headers,
+            MessageHeaders<
+                            JobClientHeartbeatRequestBody,
+                            EmptyResponseBody,
+                            JobClientHeartbeatParameters>
+                    messageHeaders) {
+        super(leaderRetriever, timeout, headers, messageHeaders);
+    }
+
+    @Override
+    public CompletableFuture<EmptyResponseBody> handleRequest(
+            HandlerRequest<JobClientHeartbeatRequestBody> request, RestfulGateway gateway)
+            throws RestHandlerException {
+        final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+        gateway.reportJobClientHeartbeat(
+                jobId, request.getRequestBody().getExpiredTimestamp(), timeout);

Review Comment:
   The client only needs to report its heartbeat and the dispatcher will deal with what to do when timeout. No need to return CompletableFuture since the client will not do any processing. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong closed pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

Posted by GitBox <gi...@apache.org>.
xintongsong closed pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…
URL: https://github.com/apache/flink/pull/21347


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #21347:
URL: https://github.com/apache/flink/pull/21347#discussion_r1057133875


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java:
##########
@@ -652,4 +654,12 @@ public void setJobStatusHooks(List<JobStatusHook> hooks) {
     public List<JobStatusHook> getJobStatusHooks() {
         return this.jobStatusHooks;
     }
+
+    public long getClientHeartbeatTimeout() {
+        return clientHeartbeatTimeout;
+    }
+
+    public void setClientHeartbeatTimeout(long clientHeartbeatTimeout) {
+        this.clientHeartbeatTimeout = clientHeartbeatTimeout;
+    }

Review Comment:
   1. I'd suggest to name this `get/setInitialClientHeartbeatTimeout`.
   2. This might be more suitable to be put into `jobConfiguration`, rather than being a direct field of `JobGraph`.



##########
flink-clients/src/main/java/org/apache/flink/client/cli/ClientOptions.java:
##########
@@ -42,4 +47,35 @@ public class ClientOptions {
                     .withDescription(
                             "The interval (in ms) between consecutive retries of failed attempts to execute "
                                     + "commands through the CLI or Flink's clients, wherever retry is supported (default 2sec).");
+
+    /** Timeout for job client to report its heartbeat. */
+    public static final ConfigOption<Long> CLIENT_HEARTBEAT_TIMEOUT =
+            key("client.heartbeat.timeout")
+                    .longType()
+                    .defaultValue(18000L)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Cancel the job if the dispatcher hasn't received the client's"
+                                                    + " heartbeat after timeout when '%s' and '%s' are both true.",
+                                            TextElement.text(DeploymentOptions.ATTACHED.key()),
+                                            TextElement.text(
+                                                    DeploymentOptions.SHUTDOWN_IF_ATTACHED.key()))
+                                    .build());
+
+    /** Time interval for job client to report its heartbeat. */
+    public static final ConfigOption<Long> CLIENT_HEARTBEAT_INTERVAL =
+            key("client.heartbeat.interval")
+                    .longType()
+                    .defaultValue(30000L)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Time interval for job client to report its heartbeat "
+                                                    + "when '%s' and '%s' are both true. Cancel the job if timeout configured by '%s'.",
+                                            TextElement.text(DeploymentOptions.ATTACHED.key()),
+                                            TextElement.text(
+                                                    DeploymentOptions.SHUTDOWN_IF_ATTACHED.key()),
+                                            TextElement.text(CLIENT_HEARTBEAT_TIMEOUT.key()))
+                                    .build());

Review Comment:
   The default values seem not right. We have a larger interval than the timeout.



##########
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java:
##########
@@ -205,6 +205,24 @@ public class ClusterOptions {
                                             TextElement.code(PROCESS_WORKING_DIR_BASE.key()))
                                     .build());
 
+    public static final ConfigOption<Long> CLIENT_ALIVENESS_CHECK_INTERVAL =
+            ConfigOptions.key("cluster.client-aliveness-check.interval")
+                    .longType()
+                    .defaultValue(60000L)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The interval to check whether the client's "
+                                                    + "heartbeat is timeout when '%s' and '%s' are "
+                                                    + "both true. Cancel the job if timeout. The "
+                                                    + "client's heartbeat interval and timeout are "
+                                                    + "set by 'client.heartbeat.interval' and "
+                                                    + "'client.heartbeat.timeout'.",
+                                            TextElement.text(DeploymentOptions.ATTACHED.key()),
+                                            TextElement.text(
+                                                    DeploymentOptions.SHUTDOWN_IF_ATTACHED.key()))
+                                    .build());
+

Review Comment:
   I'm not sure if we need a dedicated config option for the checking interval. The checking interval don't need to be very accurate and therefore we probably can just use the configured value of the client heartbeat interval on the cluster side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #21347:
URL: https://github.com/apache/flink/pull/21347#discussion_r1051748186


##########
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java:
##########
@@ -1344,6 +1344,13 @@ public CompletableFuture<Set<AbstractID>> listCompletedClusterDatasetIds() {
                                         metaInfoMap -> new HashSet<>(metaInfoMap.keySet())));
     }
 
+    public CompletableFuture<Void> reportHeartbeat(JobID jobId, long expiredTimestamp) {
+        return runDispatcherCommand(
+                dispatcherGateway ->
+                        dispatcherGateway.reportJobClientHeartbeat(
+                                jobId, expiredTimestamp, rpcTimeout));
+    }

Review Comment:
   I wonder if this feature should be supported for `MiniCluster`. In local mode, the client and the mini cluster are in the same process, thus heartbeats should not be necessary.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobClientHeartbeatHandler.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatParameters;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Receive the heartbeat from the client. */
+public class JobClientHeartbeatHandler
+        extends AbstractRestHandler<
+                RestfulGateway,
+                JobClientHeartbeatRequestBody,
+                EmptyResponseBody,
+                JobClientHeartbeatParameters> {
+    public JobClientHeartbeatHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Time timeout,
+            Map<String, String> headers,
+            MessageHeaders<
+                            JobClientHeartbeatRequestBody,
+                            EmptyResponseBody,
+                            JobClientHeartbeatParameters>
+                    messageHeaders) {
+        super(leaderRetriever, timeout, headers, messageHeaders);
+    }
+
+    @Override
+    public CompletableFuture<EmptyResponseBody> handleRequest(
+            HandlerRequest<JobClientHeartbeatRequestBody> request, RestfulGateway gateway)
+            throws RestHandlerException {
+        final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+        gateway.reportJobClientHeartbeat(
+                jobId, request.getRequestBody().getExpiredTimestamp(), timeout);

Review Comment:
   The `CompletableFuture` returned from `RestfulGateway#reportJobClientHeartbeat` is discarded. That means if anything goes wrong, the exception will be ignored.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -603,6 +634,17 @@ private void runJob(JobManagerRunner jobManagerRunner, ExecutionType executionTy
 
         final JobID jobId = jobManagerRunner.getJobID();
 
+        if (configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
+            log.info("Begin to receive the client's heartbeat for aliveness check.");
+            // Use client timeout from the configuration when the job is submitting
+            // but the client hasn't reported heartbeat.

Review Comment:
   This could result in jobs being canceled unexpectedly, if the interval in the client-side configuration is larger than timeout in the cluster-side.



##########
flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java:
##########
@@ -71,6 +71,37 @@ public class HeartbeatManagerOptions {
                                             TextElement.code("-1"))
                                     .build());
 
+    /** Timeout for job client to report its heartbeat. */
+    @Documentation.Section(Documentation.Sections.EXPERT_FAULT_TOLERANCE)
+    public static final ConfigOption<Long> CLIENT_HEARTBEAT_TIMEOUT =
+            key("client.heartbeat.timeout")
+                    .longType()
+                    .defaultValue(18000L)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Cancel the job if the dispatcher hasn't received the client's"
+                                                    + " heartbeat after timeout when '%s' is set true.",
+                                            TextElement.text(
+                                                    DeploymentOptions.SHUTDOWN_IF_ATTACHED.key()))
+                                    .build());
+
+    /** Time interval for job client to report its heartbeat. */
+    @Documentation.Section(Documentation.Sections.EXPERT_FAULT_TOLERANCE)
+    public static final ConfigOption<Long> CLIENT_HEARTBEAT_INTERVAL =
+            key("client.heartbeat.interval")
+                    .longType()
+                    .defaultValue(30000L)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Time interval for job client to report its heartbeat "
+                                                    + "when '%s' is set true. Cancel the job if timeout configured by '%s'.",
+                                            TextElement.text(
+                                                    DeploymentOptions.SHUTDOWN_IF_ATTACHED.key()),
+                                            TextElement.text(CLIENT_HEARTBEAT_TIMEOUT.key()))
+                                    .build());
+

Review Comment:
   I think these 2 options do not belong to `HeartbeatManagerOptions`, because the feature does not use `HeartbeatManager`. I'd suggest to move them to `ClientOptions`.



##########
flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java:
##########
@@ -144,4 +149,29 @@ public static void waitUntilJobInitializationFinished(
             throw new RuntimeException("Error while waiting for job to be initialized", throwable);
         }
     }
+
+    /**
+     * The client reports the heartbeat to the dispatcher for aliveness.
+     *
+     * @param jobClient The job client.
+     * @param interval The heartbeat interval.
+     * @param timeout The heartbeat timeout.
+     * @return The ScheduledExecutorService which reports heartbeat periodically.
+     */
+    public static ScheduledExecutorService reportHeartbeatPeriodically(
+            JobClient jobClient, long interval, long timeout) {

Review Comment:
   It would be better to check that `timeout` is larger than `interval`, and if not fail with a descriptive error message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

Posted by GitBox <gi...@apache.org>.
Myracle commented on PR #21347:
URL: https://github.com/apache/flink/pull/21347#issuecomment-1363554877

   @xintongsong Have a look? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

Posted by GitBox <gi...@apache.org>.
Myracle commented on PR #21347:
URL: https://github.com/apache/flink/pull/21347#issuecomment-1336607971

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

Posted by GitBox <gi...@apache.org>.
Myracle commented on PR #21347:
URL: https://github.com/apache/flink/pull/21347#issuecomment-1352472772

   @xintongsong Have a look? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on a diff in pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

Posted by GitBox <gi...@apache.org>.
Myracle commented on code in PR #21347:
URL: https://github.com/apache/flink/pull/21347#discussion_r1053283384


##########
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java:
##########
@@ -1344,6 +1344,13 @@ public CompletableFuture<Set<AbstractID>> listCompletedClusterDatasetIds() {
                                         metaInfoMap -> new HashSet<>(metaInfoMap.keySet())));
     }
 
+    public CompletableFuture<Void> reportHeartbeat(JobID jobId, long expiredTimestamp) {
+        return runDispatcherCommand(
+                dispatcherGateway ->
+                        dispatcherGateway.reportJobClientHeartbeat(
+                                jobId, expiredTimestamp, rpcTimeout));
+    }

Review Comment:
   The client's heartbeat will only be sent when the configs execution.attached and execution.shutdown-on-attached-exit are both true. So it is not used in MiniCluster at most time. It's easier to write tests in this way.  Also, we can keep consistent with other cluster modes. Do you still think that we should remove it in mimi cluster?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #21347:
URL: https://github.com/apache/flink/pull/21347#discussion_r1053913714


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobClientHeartbeatHandler.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatParameters;
+import org.apache.flink.runtime.rest.messages.JobClientHeartbeatRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Receive the heartbeat from the client. */
+public class JobClientHeartbeatHandler
+        extends AbstractRestHandler<
+                RestfulGateway,
+                JobClientHeartbeatRequestBody,
+                EmptyResponseBody,
+                JobClientHeartbeatParameters> {
+    public JobClientHeartbeatHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Time timeout,
+            Map<String, String> headers,
+            MessageHeaders<
+                            JobClientHeartbeatRequestBody,
+                            EmptyResponseBody,
+                            JobClientHeartbeatParameters>
+                    messageHeaders) {
+        super(leaderRetriever, timeout, headers, messageHeaders);
+    }
+
+    @Override
+    public CompletableFuture<EmptyResponseBody> handleRequest(
+            HandlerRequest<JobClientHeartbeatRequestBody> request, RestfulGateway gateway)
+            throws RestHandlerException {
+        final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+        gateway.reportJobClientHeartbeat(
+                jobId, request.getRequestBody().getExpiredTimestamp(), timeout);

Review Comment:
   We probably don't need to return the CompletableFuture to the client side, but the rest handler should check the future returned from the gateway, and handle the exception if the future is completed exceptionally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org