You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/03/07 11:53:05 UTC
[linkis] branch dev-1.3.2 updated: [feat]Entrance starts to clear the running tasks of the node within one day (#4321)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
new 93febf94d [feat]Entrance starts to clear the running tasks of the node within one day (#4321)
93febf94d is described below
commit 93febf94d32531649fb1af3bfbe5dc87b0562c3f
Author: huangKai-2323 <62...@users.noreply.github.com>
AuthorDate: Tue Mar 7 19:52:59 2023 +0800
[feat]Entrance starts to clear the running tasks of the node within one day (#4321)
---
docs/info-1.3.2.md | 3 +-
.../protocol/conf/EntranceInstanceConf.scala | 26 ++++++++++++
.../entrance/server/DefaultEntranceServer.java | 14 +++++++
.../entrance/conf/EntranceConfiguration.scala | 3 ++
.../linkis/jobhistory/dao/JobHistoryMapper.java | 3 ++
.../errorcode/JobhistoryErrorCodeSummary.java | 46 ++++++++++++++++++++++
.../resources/mapper/common/JobHistoryMapper.xml | 12 +++++-
.../jobhistory/service/JobHistoryQueryService.java | 4 ++
.../service/impl/JobHistoryQueryServiceImpl.scala | 37 ++++++++++++++++-
9 files changed, 142 insertions(+), 6 deletions(-)
diff --git a/docs/info-1.3.2.md b/docs/info-1.3.2.md
index c365d81f8..55e5aeecb 100644
--- a/docs/info-1.3.2.md
+++ b/docs/info-1.3.2.md
@@ -4,5 +4,4 @@
|------------------| ----- |----------------------------------------------------------------------|------| ------------------------------------------------------- |
| linkis-jobhistory | 新增 | wds.linkis.jobhistory.admin | hadoop |可以查看所有历史任务的用户 注意:wds.linkis.governance.station.admin 为管理用户(也具有可以查看所有历史任务的权限)|
| linkis | 新增 | wds.linkis.governance.station.admin.token | /具有管理员权限的特殊token|
-
-
+| cg-entrance | 新增 | linkis.entrance.auto.clean.dirty.data.enable | true |entrance重启调用ps-jobhistory接口是否开启,ture为开启,取值范围:true或false|
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/EntranceInstanceConf.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/EntranceInstanceConf.scala
new file mode 100644
index 000000000..3a237d49e
--- /dev/null
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/EntranceInstanceConf.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.governance.common.protocol.conf
+
+import org.apache.linkis.protocol.message.RequestProtocol
+
+trait EntranceInstanceConf extends RequestProtocol
+
+case class EntranceInstanceConfRequest(instance: String) extends EntranceInstanceConf
+
+case class EntranceInstanceConfResponse()
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
index a050056fe..430133496 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
@@ -17,11 +17,14 @@
package org.apache.linkis.entrance.server;
+import org.apache.linkis.common.ServiceInstance;
import org.apache.linkis.entrance.EntranceContext;
import org.apache.linkis.entrance.EntranceServer;
+import org.apache.linkis.entrance.conf.EntranceConfiguration$;
import org.apache.linkis.entrance.constant.ServiceNameConsts;
import org.apache.linkis.entrance.execute.EntranceJob;
import org.apache.linkis.entrance.log.LogReader;
+import org.apache.linkis.governance.common.protocol.conf.EntranceInstanceConfRequest;
import org.apache.linkis.rpc.Sender;
import org.springframework.beans.factory.annotation.Autowired;
@@ -55,6 +58,17 @@ public class DefaultEntranceServer extends EntranceServer {
public void init() {
getEntranceWebSocketService();
addRunningJobEngineStatusMonitor();
+ cleanUpEntranceDirtyData();
+ }
+
+ private void cleanUpEntranceDirtyData() {
+ if ((Boolean) EntranceConfiguration$.MODULE$.ENABLE_ENTRANCE_DIRTY_DATA_CLEAR().getValue()) {
+ Sender sender =
+ Sender.getSender(
+ EntranceConfiguration$.MODULE$.JOBHISTORY_SPRING_APPLICATION_NAME().getValue());
+ ServiceInstance thisServiceInstance = Sender.getThisServiceInstance();
+ sender.ask(new EntranceInstanceConfRequest(thisServiceInstance.getInstance()));
+ }
}
@Override
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
index fca5a61aa..5c61ce0b3 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
@@ -220,4 +220,7 @@ object EntranceConfiguration {
val CREATOR_IP_SWITCH =
CommonVars("wds.linkis.entrance.user.creator.ip.interceptor.switch", false)
+ val ENABLE_ENTRANCE_DIRTY_DATA_CLEAR =
+ CommonVars("linkis.entrance.auto.clean.dirty.data.enable", true)
+
}
diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java
index 1403a29ed..7bb765634 100644
--- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java
+++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java
@@ -105,4 +105,7 @@ public interface JobHistoryMapper {
String selectJobHistoryStatusForUpdate(Long jobId);
void updateOberverById(@Param("taskid") Long taskid, @Param("observeInfo") String observeInfo);
+
+ void updateJobHistoryCancelById(
+ @Param("idList") List<Long> idList, @Param("errorDesc") String errorDesc);
}
diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/errorcode/JobhistoryErrorCodeSummary.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/errorcode/JobhistoryErrorCodeSummary.java
new file mode 100644
index 000000000..5c62944d5
--- /dev/null
+++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/errorcode/JobhistoryErrorCodeSummary.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.jobhistory.errorcode;
+
+import org.apache.linkis.common.errorcode.LinkisErrorCode;
+
+public enum JobhistoryErrorCodeSummary implements LinkisErrorCode {
+ UNFINISHED_TASKS(
+ 20020,
+ "entrance service restart, automatically cancel tasks that have not been completed for a long time (24h)");
+
+ /** (errorCode)错误码 */
+ private final int errorCode;
+ /** (errorDesc)错误描述 */
+ private final String errorDesc;
+
+ JobhistoryErrorCodeSummary(int errorCode, String errorDesc) {
+ this.errorCode = errorCode;
+ this.errorDesc = errorDesc;
+ }
+
+ @Override
+ public int getErrorCode() {
+ return errorCode;
+ }
+
+ @Override
+ public String getErrorDesc() {
+ return errorDesc;
+ }
+}
diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml
index 8ac85a7c4..824d989c6 100644
--- a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml
+++ b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml
@@ -15,7 +15,7 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
-
+
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.linkis.jobhistory.dao.JobHistoryMapper">
@@ -149,7 +149,7 @@
<if test="jobReqId != null">job_req_id = #{jobReqId},</if>
<if test="submitUser != null">submit_user = #{submitUser},</if>
<if test="executeUser != null">execute_user = #{executeUser},</if>
- <if test="source != null">source = #{source},</if>
+ <if test="source != null">`source` = #{source},</if>
<if test="labels != null">labels = #{labels},</if>
<if test="params != null">params = #{params},</if>
<if test="progress != null">progress = #{progress},</if>
@@ -221,4 +221,12 @@
<update id="updateOberverById">
update linkis_ps_job_history_group_history set observe_info = #{observeInfo} where id = #{taskid}
</update>
+
+ <update id="updateJobHistoryCancelById" >
+ update linkis_ps_job_history_group_history set status = 'Cancelled' ,error_desc = #{errorDesc}
+ WHERE id IN
+ <foreach collection="idList" item="id" separator="," open="(" close=")">
+ #{id}
+ </foreach>
+ </update>
</mapper>
diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java
index b23873890..433cbe047 100644
--- a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java
+++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java
@@ -18,8 +18,10 @@
package org.apache.linkis.jobhistory.service;
import org.apache.linkis.governance.common.entity.job.JobRequest;
+import org.apache.linkis.governance.common.protocol.conf.EntranceInstanceConfRequest;
import org.apache.linkis.governance.common.protocol.job.*;
import org.apache.linkis.jobhistory.entity.JobHistory;
+import org.apache.linkis.rpc.Sender;
import java.util.ArrayList;
import java.util.Date;
@@ -47,4 +49,6 @@ public interface JobHistoryQueryService {
List<JobRequest> getQueryVOList(List<JobHistory> list);
void changeObserveInfoById(JobHistory jobHistory);
+
+ void clearUndoneTasksByEntranceInstance(EntranceInstanceConfRequest request, Sender sender);
}
diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
index 3512d3fbf..6778276d3 100644
--- a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
+++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
@@ -26,31 +26,35 @@ import org.apache.linkis.governance.common.entity.job.{
QueryException,
SubJobDetail
}
+import org.apache.linkis.governance.common.protocol.conf.EntranceInstanceConfRequest
import org.apache.linkis.governance.common.protocol.job._
import org.apache.linkis.jobhistory.conversions.TaskConversions._
import org.apache.linkis.jobhistory.dao.JobHistoryMapper
import org.apache.linkis.jobhistory.entity.{JobHistory, QueryJobHistory}
+import org.apache.linkis.jobhistory.errorcode.JobhistoryErrorCodeSummary
import org.apache.linkis.jobhistory.service.JobHistoryQueryService
import org.apache.linkis.jobhistory.transitional.TaskStatus
import org.apache.linkis.jobhistory.util.QueryUtils
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
+import org.apache.linkis.rpc.Sender
import org.apache.linkis.rpc.message.annotation.Receiver
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
+import org.apache.commons.lang3.time.DateUtils
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.{lang, util}
import java.sql.Timestamp
-import java.util.Date
+import java.util.{Calendar, Date}
import java.util.concurrent.{Callable, TimeUnit}
import scala.collection.JavaConverters._
import com.google.common.cache.{Cache, CacheBuilder}
-import com.google.common.collect.Iterables
+import com.google.common.collect.{Iterables, Lists}
@Service
class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging {
@@ -428,4 +432,33 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging {
jobHistoryMapper.updateOberverById(jobHistory.getId, jobHistory.getObserveInfo)
}
+ @Receiver
+ override def clearUndoneTasksByEntranceInstance(
+ request: EntranceInstanceConfRequest,
+ sender: Sender
+ ): Unit = {
+ // Query incomplete tasks
+ logger.info("Request Entrance Instance :{}", request.instance)
+ val statusList: util.List[String] = new util.ArrayList[String]()
+ statusList.add(TaskStatus.WaitForRetry.toString)
+ statusList.add(TaskStatus.Inited.toString)
+ statusList.add(TaskStatus.Scheduled.toString)
+ statusList.add(TaskStatus.Running.toString)
+ val eDate = new Date(System.currentTimeMillis)
+ val sDate = DateUtils.addDays(eDate, -1)
+ val jobHistoryList =
+ jobHistoryMapper.search(null, null, statusList, sDate, eDate, null, null, request.instance)
+ val idlist = jobHistoryList.asScala.map(_.getId).asJava
+ logger.info("Tasks id will be canceled ids :{}", idlist)
+ // Modify task status
+ val errorMsg = JobhistoryErrorCodeSummary.UNFINISHED_TASKS.getErrorDesc
+ if (!idlist.isEmpty) {
+ if (idlist.size() >= 1000) logger.error("The number of batch modification tasks exceeds 1000")
+ Lists
+ .partition(idlist, 100)
+ .asScala
+ .foreach(idlist => jobHistoryMapper.updateJobHistoryCancelById(idlist, errorMsg))
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org