You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@streampark.apache.org by "1996fanrui (via GitHub)" <gi...@apache.org> on 2023/02/02 13:28:08 UTC

[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2268: [Feature][Issue-2192] Support trigger savepoint manually.

1996fanrui commented on code in PR #2268:
URL: https://github.com/apache/incubator-streampark/pull/2268#discussion_r1094488180


##########
streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql:
##########
@@ -210,12 +210,19 @@ drop table if exists `t_flink_savepoint`;
 create table `t_flink_savepoint` (
   `id` bigint not null auto_increment,
   `app_id` bigint not null,
+  `job_id`           VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,

Review Comment:
   Why need the `job_id`?



##########
streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql:
##########
@@ -117,6 +117,7 @@ insert into `t_menu` values (100066, 100015, 'view', null, null, 'app:view', nul
 insert into `t_menu` values (100067, 100053, 'view', NULL, NULL, 'variable:view', NULL, '1', 1, null, now(), now());
 insert into `t_menu` values (100068, 100033, 'view', null, null, 'setting:view', null, '1', 1, null, now(), now());
 insert into `t_menu` values (100069, 100053, 'depend view', null, null, 'variable:depend_apps', null, '1', 1, NULL, now(), now());
+insert into `t_menu` values (100070, 100015, 'savepoint trigger', null, null, 'savepoint:triggerSavepoint', null, '1', 1, null, now(), now());

Review Comment:
   How about change the `savepoint:triggerSavepoint` to `savepoint:trigger`? It is clear to trigger savepoint.



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java:
##########
@@ -108,6 +112,23 @@ public void process(Long appId, CheckPoints checkPoints) {
     }
   }
 
+  private boolean shouldProcessFailedTrigger(
+      CheckPoints.CheckPoint checkPoint, Application application, CheckPointStatus status) {
+    return CheckPointStatus.FAILED.equals(status)
+        && !checkPoint.getIsSavepoint()
+        && application.cpFailedTrigger();
+  }
+
+  private boolean checkpointNeedStore(
+      Application application, @Nonnull CheckPoints.CheckPoint checkPoint) {
+    LambdaQueryWrapper<SavePoint> queryWrapper =
+        new LambdaQueryWrapper<SavePoint>()
+            .eq(SavePoint::getAppId, application.getAppId())
+            .eq(SavePoint::getJobId, application.getJobId())

Review Comment:
   We don't suggest add the jobId here, it's better to using the `instance_id` in the future, `instance_id` is more intuitively.



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java:
##########
@@ -108,6 +112,23 @@ public void process(Long appId, CheckPoints checkPoints) {
     }
   }
 
+  private boolean shouldProcessFailedTrigger(
+      CheckPoints.CheckPoint checkPoint, Application application, CheckPointStatus status) {
+    return CheckPointStatus.FAILED.equals(status)
+        && !checkPoint.getIsSavepoint()
+        && application.cpFailedTrigger();
+  }
+
+  private boolean checkpointNeedStore(
+      Application application, @Nonnull CheckPoints.CheckPoint checkPoint) {
+    LambdaQueryWrapper<SavePoint> queryWrapper =

Review Comment:
   We should check the `checkPointCache` first, right?



##########
streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql:
##########
@@ -210,12 +210,19 @@ drop table if exists `t_flink_savepoint`;
 create table `t_flink_savepoint` (
   `id` bigint not null auto_increment,
   `app_id` bigint not null,
+  `job_id`           VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
+  `status` VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT 'COMPLETED',

Review Comment:
   I see the checkpoint or savepoint is saved only when `status` is 'COMPLETED', right?
   
   If yes, why need the  `status`?



##########
streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql:
##########
@@ -210,12 +210,19 @@ drop table if exists `t_flink_savepoint`;
 create table `t_flink_savepoint` (
   `id` bigint not null auto_increment,
   `app_id` bigint not null,
+  `job_id`           VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
+  `status` VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT 'COMPLETED',
   `chk_id` bigint default null,
   `type` tinyint default null,
-  `path` varchar(255) collate utf8mb4_general_ci default null,
+  `path` varchar(2048) collate utf8mb4_general_ci default null,
   `latest` tinyint not null default 1,
   `trigger_time` datetime default null,
+  `execution_mode` VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,

Review Comment:
   What does the `execution_mode` mean here? And why need it here?



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -1267,8 +1268,14 @@ public void cancel(Application appParam) throws Exception {
                 savePoint.setAppId(application.getId());
                 savePoint.setLatest(true);
                 savePoint.setType(CheckPointType.SAVEPOINT.get());
-                savePoint.setTriggerTime(now);
+                savePoint.setTriggerTime(triggerTime);
                 savePoint.setCreateTime(now);
+                savePoint.setJobId(cancelRequest.jobId());
+                savePoint.setEndTime(now);
+                savePoint.setClusterId(application.getFlinkClusterId());
+                if (appParam.getExecutionMode() != null) {
+                  savePoint.setExecutionMode(ExecutionMode.of(appParam.getExecutionMode()).name());

Review Comment:
   `ExecutionMode.of(appParam.getExecutionMode()).name()` == `appParam.getExecutionMode()`, right?
   
   If yes, how about `savePoint.setExecutionMode(appParam.getExecutionMode())`?
   
   And the `if (appParam.getExecutionMode() != null) ` can be removed.



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java:
##########
@@ -357,7 +357,9 @@ private void handleCheckPoints(Application application) throws Exception {
     FlinkCluster flinkCluster = getFlinkCluster(application);
     CheckPoints checkPoints = httpCheckpoints(application, flinkCluster);
     if (checkPoints != null) {
-      checkpointProcessor.process(application.getId(), checkPoints);
+      checkPoints
+          .getLatestChkAndSavepointWithTimeAscOrder()

Review Comment:
   Why `getLatestChkAndSavepointWithTimeAscOrder ` return a list? And why need process all checkpoints and savepoints?  The  latest checkpoint and latest savepoint should be enough,  right?



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java:
##########
@@ -36,19 +38,30 @@
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
+import javax.annotation.Nullable;
+
 @Slf4j
 @Service
 @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
 public class SavePointServiceImpl extends ServiceImpl<SavePointMapper, SavePoint>
     implements SavePointService {
 
+  private static final ObjectMapper objMapper = RestMapperUtils.getStrictObjectMapper();

Review Comment:
   The static field name should be uppercase: `OBJECT_MAPPER`



##########
streampark-console/streampark-console-service/src/main/assembly/script/upgrade/ISSUE-2192/schema/2192-mysql.sql:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+alter table `t_flink_savepoint` add column `job_id` VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL after `app_id`;

Review Comment:
   The upgrade sql should be moved to [here](https://github.com/apache/incubator-streampark/tree/dev/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql). And file name should be `2.1.0.sql`.



##########
streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql:
##########
@@ -210,12 +210,19 @@ drop table if exists `t_flink_savepoint`;
 create table `t_flink_savepoint` (
   `id` bigint not null auto_increment,
   `app_id` bigint not null,
+  `job_id`           VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
+  `status` VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT 'COMPLETED',
   `chk_id` bigint default null,
   `type` tinyint default null,
-  `path` varchar(255) collate utf8mb4_general_ci default null,
+  `path` varchar(2048) collate utf8mb4_general_ci default null,
   `latest` tinyint not null default 1,
   `trigger_time` datetime default null,
+  `execution_mode` VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
+  `cluster_id` bigint DEFAULT NULL,
+  `end_time`    datetime DEFAULT NULL,

Review Comment:
   Why need the `end_time `?



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java:
##########
@@ -114,6 +127,42 @@ public SavePoint getLatest(Long id) {
     return this.getOne(queryWrapper);
   }
 
+  @Override
+  public Boolean triggerSavepoint(Long appId, @Nullable String savepointPath) {
+    log.info("Start to trigger savepoint for app {}", appId);
+    Application app = appService.getById(appId);
+    AssertUtils.notNull(app, String.format("The application %s doesn't exist.", appId));
+    String trackingUrl = app.getJobManagerUrl();
+    AssertUtils.state(
+        StringUtils.isNotEmpty(trackingUrl),
+        String.format("The flink trackingUrl for app[%s] isn't available.", appId));
+    String jobId = app.getJobId();
+    AssertUtils.state(
+        StringUtils.isNotEmpty(jobId),
+        String.format("The jobId of application[%s] is absent.", appId));
+    String triggerId = triggerSavepoint(trackingUrl, jobId, savepointPath);
+    log.info("Request savepoint successful in triggerId {}", triggerId);
+    return true;

Review Comment:
   This method either returns true or throws an exception. Never returns false, so the return value is meaningless and can be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org