You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/03/07 11:53:05 UTC

[linkis] branch dev-1.3.2 updated: [feat]Entrance starts to clear the running tasks of the node within one day (#4321)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
     new 93febf94d [feat]Entrance starts to clear the running tasks of the node within one day (#4321)
93febf94d is described below

commit 93febf94d32531649fb1af3bfbe5dc87b0562c3f
Author: huangKai-2323 <62...@users.noreply.github.com>
AuthorDate: Tue Mar 7 19:52:59 2023 +0800

    [feat]Entrance starts to clear the running tasks of the node within one day (#4321)
---
 docs/info-1.3.2.md                                 |  3 +-
 .../protocol/conf/EntranceInstanceConf.scala       | 26 ++++++++++++
 .../entrance/server/DefaultEntranceServer.java     | 14 +++++++
 .../entrance/conf/EntranceConfiguration.scala      |  3 ++
 .../linkis/jobhistory/dao/JobHistoryMapper.java    |  3 ++
 .../errorcode/JobhistoryErrorCodeSummary.java      | 46 ++++++++++++++++++++++
 .../resources/mapper/common/JobHistoryMapper.xml   | 12 +++++-
 .../jobhistory/service/JobHistoryQueryService.java |  4 ++
 .../service/impl/JobHistoryQueryServiceImpl.scala  | 37 ++++++++++++++++-
 9 files changed, 142 insertions(+), 6 deletions(-)

diff --git a/docs/info-1.3.2.md b/docs/info-1.3.2.md
index c365d81f8..55e5aeecb 100644
--- a/docs/info-1.3.2.md
+++ b/docs/info-1.3.2.md
@@ -4,5 +4,4 @@
 |------------------| ----- |----------------------------------------------------------------------|------| ------------------------------------------------------- |
 | linkis-jobhistory | 新增  | wds.linkis.jobhistory.admin | hadoop |可以查看所有历史任务的用户 注意:wds.linkis.governance.station.admin 为管理用户(也具有可以查看所有历史任务的权限)|
 | linkis | 新增  | wds.linkis.governance.station.admin.token |   /具有管理员权限的特殊token|
-
-
+| cg-entrance | 新增  | linkis.entrance.auto.clean.dirty.data.enable | true |entrance重启调用ps-jobhistory接口是否开启,ture为开启,取值范围:true或false|
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/EntranceInstanceConf.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/EntranceInstanceConf.scala
new file mode 100644
index 000000000..3a237d49e
--- /dev/null
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/EntranceInstanceConf.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.governance.common.protocol.conf
+
+import org.apache.linkis.protocol.message.RequestProtocol
+
+trait EntranceInstanceConf extends RequestProtocol
+
+case class EntranceInstanceConfRequest(instance: String) extends EntranceInstanceConf
+
+case class EntranceInstanceConfResponse()
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
index a050056fe..430133496 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
@@ -17,11 +17,14 @@
 
 package org.apache.linkis.entrance.server;
 
+import org.apache.linkis.common.ServiceInstance;
 import org.apache.linkis.entrance.EntranceContext;
 import org.apache.linkis.entrance.EntranceServer;
+import org.apache.linkis.entrance.conf.EntranceConfiguration$;
 import org.apache.linkis.entrance.constant.ServiceNameConsts;
 import org.apache.linkis.entrance.execute.EntranceJob;
 import org.apache.linkis.entrance.log.LogReader;
+import org.apache.linkis.governance.common.protocol.conf.EntranceInstanceConfRequest;
 import org.apache.linkis.rpc.Sender;
 
 import org.springframework.beans.factory.annotation.Autowired;
@@ -55,6 +58,17 @@ public class DefaultEntranceServer extends EntranceServer {
   public void init() {
     getEntranceWebSocketService();
     addRunningJobEngineStatusMonitor();
+    cleanUpEntranceDirtyData();
+  }
+
+  private void cleanUpEntranceDirtyData() {
+    if ((Boolean) EntranceConfiguration$.MODULE$.ENABLE_ENTRANCE_DIRTY_DATA_CLEAR().getValue()) {
+      Sender sender =
+          Sender.getSender(
+              EntranceConfiguration$.MODULE$.JOBHISTORY_SPRING_APPLICATION_NAME().getValue());
+      ServiceInstance thisServiceInstance = Sender.getThisServiceInstance();
+      sender.ask(new EntranceInstanceConfRequest(thisServiceInstance.getInstance()));
+    }
   }
 
   @Override
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
index fca5a61aa..5c61ce0b3 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
@@ -220,4 +220,7 @@ object EntranceConfiguration {
   val CREATOR_IP_SWITCH =
     CommonVars("wds.linkis.entrance.user.creator.ip.interceptor.switch", false)
 
+  val ENABLE_ENTRANCE_DIRTY_DATA_CLEAR =
+    CommonVars("linkis.entrance.auto.clean.dirty.data.enable", true)
+
 }
diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java
index 1403a29ed..7bb765634 100644
--- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java
+++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java
@@ -105,4 +105,7 @@ public interface JobHistoryMapper {
   String selectJobHistoryStatusForUpdate(Long jobId);
 
   void updateOberverById(@Param("taskid") Long taskid, @Param("observeInfo") String observeInfo);
+
+  void updateJobHistoryCancelById(
+      @Param("idList") List<Long> idList, @Param("errorDesc") String errorDesc);
 }
diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/errorcode/JobhistoryErrorCodeSummary.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/errorcode/JobhistoryErrorCodeSummary.java
new file mode 100644
index 000000000..5c62944d5
--- /dev/null
+++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/errorcode/JobhistoryErrorCodeSummary.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.jobhistory.errorcode;
+
+import org.apache.linkis.common.errorcode.LinkisErrorCode;
+
+public enum JobhistoryErrorCodeSummary implements LinkisErrorCode {
+  UNFINISHED_TASKS(
+      20020,
+      "entrance service restart, automatically cancel tasks that have not been completed for a long time (24h)");
+
+  /** (errorCode)错误码 */
+  private final int errorCode;
+  /** (errorDesc)错误描述 */
+  private final String errorDesc;
+
+  JobhistoryErrorCodeSummary(int errorCode, String errorDesc) {
+    this.errorCode = errorCode;
+    this.errorDesc = errorDesc;
+  }
+
+  @Override
+  public int getErrorCode() {
+    return errorCode;
+  }
+
+  @Override
+  public String getErrorDesc() {
+    return errorDesc;
+  }
+}
diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml
index 8ac85a7c4..824d989c6 100644
--- a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml
+++ b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml
@@ -15,7 +15,7 @@
   ~ See the License for the specific language governing permissions and
   ~ limitations under the License.
   -->
-  
+
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
 
 <mapper namespace="org.apache.linkis.jobhistory.dao.JobHistoryMapper">
@@ -149,7 +149,7 @@
             <if test="jobReqId != null">job_req_id = #{jobReqId},</if>
             <if test="submitUser != null">submit_user = #{submitUser},</if>
             <if test="executeUser != null">execute_user = #{executeUser},</if>
-            <if test="source != null">source = #{source},</if>
+            <if test="source != null">`source` = #{source},</if>
             <if test="labels != null">labels = #{labels},</if>
             <if test="params != null">params = #{params},</if>
             <if test="progress != null">progress = #{progress},</if>
@@ -221,4 +221,12 @@
     <update id="updateOberverById">
         update linkis_ps_job_history_group_history set observe_info = #{observeInfo} where id = #{taskid}
     </update>
+
+    <update id="updateJobHistoryCancelById" >
+        update linkis_ps_job_history_group_history set status = 'Cancelled' ,error_desc = #{errorDesc}
+        WHERE id IN
+        <foreach collection="idList" item="id" separator="," open="(" close=")">
+            #{id}
+        </foreach>
+    </update>
 </mapper>
diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java
index b23873890..433cbe047 100644
--- a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java
+++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java
@@ -18,8 +18,10 @@
 package org.apache.linkis.jobhistory.service;
 
 import org.apache.linkis.governance.common.entity.job.JobRequest;
+import org.apache.linkis.governance.common.protocol.conf.EntranceInstanceConfRequest;
 import org.apache.linkis.governance.common.protocol.job.*;
 import org.apache.linkis.jobhistory.entity.JobHistory;
+import org.apache.linkis.rpc.Sender;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -47,4 +49,6 @@ public interface JobHistoryQueryService {
     List<JobRequest> getQueryVOList(List<JobHistory> list);
 
     void changeObserveInfoById(JobHistory jobHistory);
+
+    void clearUndoneTasksByEntranceInstance(EntranceInstanceConfRequest request, Sender sender);
 }
diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
index 3512d3fbf..6778276d3 100644
--- a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
+++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
@@ -26,31 +26,35 @@ import org.apache.linkis.governance.common.entity.job.{
   QueryException,
   SubJobDetail
 }
+import org.apache.linkis.governance.common.protocol.conf.EntranceInstanceConfRequest
 import org.apache.linkis.governance.common.protocol.job._
 import org.apache.linkis.jobhistory.conversions.TaskConversions._
 import org.apache.linkis.jobhistory.dao.JobHistoryMapper
 import org.apache.linkis.jobhistory.entity.{JobHistory, QueryJobHistory}
+import org.apache.linkis.jobhistory.errorcode.JobhistoryErrorCodeSummary
 import org.apache.linkis.jobhistory.service.JobHistoryQueryService
 import org.apache.linkis.jobhistory.transitional.TaskStatus
 import org.apache.linkis.jobhistory.util.QueryUtils
 import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
+import org.apache.linkis.rpc.Sender
 import org.apache.linkis.rpc.message.annotation.Receiver
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.commons.lang3.exception.ExceptionUtils
+import org.apache.commons.lang3.time.DateUtils
 
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.stereotype.Service
 
 import java.{lang, util}
 import java.sql.Timestamp
-import java.util.Date
+import java.util.{Calendar, Date}
 import java.util.concurrent.{Callable, TimeUnit}
 
 import scala.collection.JavaConverters._
 
 import com.google.common.cache.{Cache, CacheBuilder}
-import com.google.common.collect.Iterables
+import com.google.common.collect.{Iterables, Lists}
 
 @Service
 class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging {
@@ -428,4 +432,33 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging {
     jobHistoryMapper.updateOberverById(jobHistory.getId, jobHistory.getObserveInfo)
   }
 
+  @Receiver
+  override def clearUndoneTasksByEntranceInstance(
+      request: EntranceInstanceConfRequest,
+      sender: Sender
+  ): Unit = {
+    // Query incomplete tasks
+    logger.info("Request Entrance Instance :{}", request.instance)
+    val statusList: util.List[String] = new util.ArrayList[String]()
+    statusList.add(TaskStatus.WaitForRetry.toString)
+    statusList.add(TaskStatus.Inited.toString)
+    statusList.add(TaskStatus.Scheduled.toString)
+    statusList.add(TaskStatus.Running.toString)
+    val eDate = new Date(System.currentTimeMillis)
+    val sDate = DateUtils.addDays(eDate, -1)
+    val jobHistoryList =
+      jobHistoryMapper.search(null, null, statusList, sDate, eDate, null, null, request.instance)
+    val idlist = jobHistoryList.asScala.map(_.getId).asJava
+    logger.info("Tasks id will be canceled ids :{}", idlist)
+    // Modify task status
+    val errorMsg = JobhistoryErrorCodeSummary.UNFINISHED_TASKS.getErrorDesc
+    if (!idlist.isEmpty) {
+      if (idlist.size() >= 1000) logger.error("The number of batch modification tasks exceeds 1000")
+      Lists
+        .partition(idlist, 100)
+        .asScala
+        .foreach(idlist => jobHistoryMapper.updateJobHistoryCancelById(idlist, errorMsg))
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org