You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/08/10 06:10:44 UTC
[incubator-linkis] branch dev-1.2.1 updated: Sub task removed (#2663)
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.2.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.2.1 by this push:
new 06aeae9da Sub task removed (#2663)
06aeae9da is described below
commit 06aeae9daec35fafc56cd846afccbad81d721cbd
Author: peacewong <wp...@gmail.com>
AuthorDate: Wed Aug 10 14:10:39 2022 +0800
Sub task removed (#2663)
* remove sub task logical
* remove subJobInfo use
* update kill yarn logical
* remove hook
* remove exception print
* update error code desc max len to 512
---
.../common/conf/GovernaceCommonConf.scala | 2 +
.../EngineResultsetPrefixExecutorHook.scala | 51 -------
.../entrance/job/EntranceExecuteRequest.java | 35 +----
.../linkis/entrance/job/EntranceExecutionJob.java | 164 +--------------------
.../persistence/QueryPersistenceEngine.java | 85 +----------
.../persistence/QueryPersistenceManager.java | 47 +-----
.../linkis/entrance/EntranceWebSocketService.scala | 4 -
.../entrance/execute/DefaultEntranceExecutor.scala | 96 +++---------
.../linkis/entrance/execute/EntranceExecutor.scala | 9 +-
.../linkis/entrance/execute/EntranceJob.scala | 65 +++-----
.../entrance/persistence/PersistenceEngine.scala | 13 +-
.../linkis/entrance/utils/JobHistoryHelper.scala | 30 +---
.../jobhistory/restful/api/QueryRestfulApi.java | 2 -
.../jobhistory/receiver/QueryReceiverChooser.scala | 49 ------
.../impl/JobHistoryDetailQueryServiceImpl.scala | 32 +---
.../service/impl/JobHistoryQueryServiceImpl.scala | 16 +-
16 files changed, 80 insertions(+), 620 deletions(-)
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala
index 5394c58b6..d7b0d0e80 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala
@@ -53,6 +53,8 @@ object GovernanceCommonConf {
val ENGINECONN_ENVKEYS = CommonVars("wds.linkis.engineconn.env.keys", "").getValue
+ val ERROR_CODE_DESC_LEN = CommonVars("linkis.error.code.desc.len", 512, "Error code description maximum length").getValue
+
def getEngineEnvValue(envKey: String): String = {
CommonVars(envKey, "").getValue
}
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/executor/EngineResultsetPrefixExecutorHook.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/executor/EngineResultsetPrefixExecutorHook.scala
deleted file mode 100644
index 1212873a3..000000000
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/hook/executor/EngineResultsetPrefixExecutorHook.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineconn.computation.executor.hook.executor
-
-import org.apache.linkis.common.utils.{Logging, Utils}
-import org.apache.linkis.engineconn.common.creation.EngineCreationContext
-import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
-import org.apache.linkis.engineconn.computation.executor.hook.ComputationExecutorHook
-import org.apache.linkis.governance.common.utils.GovernanceConstant
-import org.apache.linkis.server.BDPJettyServerHelper
-
-
-class EngineResultsetPrefixExecutorHook extends ComputationExecutorHook with Logging {
-
- override def getHookName(): String = "EngineResultsetPrefixExecutorHook"
-
- override def beforeExecutorExecute(engineExecutionContext: EngineExecutionContext, engineCreationContext: EngineCreationContext, codeBeforeHook: String): String = {
- val propMap = engineExecutionContext.getProperties
- Utils.tryAndError {
- val resultsetIndex: Int = {
- if (propMap.containsKey(GovernanceConstant.RESULTSET_INDEX)) {
- propMap.get(GovernanceConstant.RESULTSET_INDEX).asInstanceOf[Int]
- } else {
- -1
- }
- }
- if (resultsetIndex >= 0) {
- engineExecutionContext.setResultSetNum(resultsetIndex)
- logger.info(s"Set resultset aliasNum to ${resultsetIndex}")
- } else {
- logger.warn(s"No resultsetIndex found in props : ${BDPJettyServerHelper.gson.toJson(propMap)} \nDefault resultIndex is 0.")
- }
- }
- codeBeforeHook
- }
-}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecuteRequest.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecuteRequest.java
index d1db38d56..38a2359f0 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecuteRequest.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecuteRequest.java
@@ -20,11 +20,9 @@ package org.apache.linkis.entrance.job;
import org.apache.linkis.entrance.execute.LabelExecuteRequest;
import org.apache.linkis.entrance.execute.RuntimePropertiesExecuteRequest;
import org.apache.linkis.entrance.execute.UserExecuteRequest;
-import org.apache.linkis.governance.common.entity.job.SubJobInfo;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.scheduler.executer.ExecuteRequest;
import org.apache.linkis.scheduler.executer.JobExecuteRequest;
-import org.apache.linkis.server.BDPJettyServerHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,17 +44,8 @@ public class EntranceExecuteRequest
setJob(job);
}
- private SubJobInfo subJobInfo;
private List<Label<?>> labels;
- public SubJobInfo getSubJobInfo() {
- return subJobInfo;
- }
-
- public void setSubJobInfo(SubJobInfo subJobInfo) {
- this.subJobInfo = subJobInfo;
- }
-
public List<Label<?>> getLabels() {
return labels;
}
@@ -75,34 +64,22 @@ public class EntranceExecuteRequest
private EntranceExecutionJob job;
- public void setExecutionCode(int index) {
- SubJobInfo[] jobGroupInfo = job.getJobGroups();
- if (null != jobGroupInfo && index >= 0 && index < jobGroupInfo.length) {
- subJobInfo = jobGroupInfo[index];
- } else {
- logger.warn(
- "Invalid index : {} in jobRequest : {}. ",
- index,
- BDPJettyServerHelper.gson().toJson(jobGroupInfo));
- }
- }
-
@Override
public String code() {
- if (null != subJobInfo) {
- return subJobInfo.getCode();
+ if (null != job && null != job.getJobRequest()) {
+ return job.getJobRequest().getExecutionCode();
} else {
- logger.error("SubJobInfo is null!");
+ logger.error("JobRequest code is null!");
return null;
}
}
@Override
public String jobId() {
- if (null != subJobInfo && null != subJobInfo.getSubJobDetail()) {
- return String.valueOf(subJobInfo.getSubJobDetail().getId());
+ if (null != job && null != job.getJobRequest()) {
+ return String.valueOf(job.getJobRequest().getId());
} else {
- logger.error("JobDetail is null!");
+ logger.error("job request is null!");
return null;
}
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java
index bd3d681a3..d09411a55 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java
@@ -19,7 +19,6 @@ package org.apache.linkis.entrance.job;
import org.apache.linkis.common.log.LogUtils;
import org.apache.linkis.common.utils.ByteTimeUtils;
-import org.apache.linkis.entrance.exception.EntranceErrorCode;
import org.apache.linkis.entrance.exception.EntranceErrorException;
import org.apache.linkis.entrance.execute.EntranceJob;
import org.apache.linkis.entrance.log.LogHandler;
@@ -30,19 +29,13 @@ import org.apache.linkis.entrance.log.WebSocketLogWriter;
import org.apache.linkis.entrance.persistence.PersistenceManager;
import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
import org.apache.linkis.governance.common.constant.job.JobRequestConstants;
-import org.apache.linkis.governance.common.entity.job.SubJobDetail;
-import org.apache.linkis.governance.common.entity.job.SubJobInfo;
import org.apache.linkis.governance.common.protocol.task.RequestTask$;
-import org.apache.linkis.governance.common.utils.GovernanceConstant;
import org.apache.linkis.manager.label.entity.Label;
-import org.apache.linkis.manager.label.entity.entrance.BindEngineLabel;
import org.apache.linkis.orchestrator.plans.ast.QueryParams$;
import org.apache.linkis.protocol.constants.TaskConstant;
-import org.apache.linkis.protocol.engine.JobProgressInfo;
import org.apache.linkis.protocol.utils.TaskUtils;
import org.apache.linkis.scheduler.executer.ExecuteRequest;
import org.apache.linkis.scheduler.queue.JobInfo;
-import org.apache.linkis.scheduler.queue.SchedulerEventState;
import org.apache.commons.io.IOUtils;
@@ -52,7 +45,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -69,7 +61,6 @@ public class EntranceExecutionJob extends EntranceJob implements LogHandler {
private WebSocketLogWriter webSocketLogWriter;
private static final Logger logger = LoggerFactory.getLogger(EntranceExecutionJob.class);
private PersistenceManager persistenceManager;
- private int runningIndex = 0;
public EntranceExecutionJob(PersistenceManager persistenceManager) {
this.persistenceManager = persistenceManager;
@@ -121,59 +112,9 @@ public class EntranceExecutionJob extends EntranceJob implements LogHandler {
@Override
public void init() throws EntranceErrorException {
- List<EntranceErrorException> errList = new ArrayList<>();
- SubJobInfo[] subJobInfos =
- Arrays.stream(getCodeParser().parse(getJobRequest().getExecutionCode()))
- .map(
- code -> {
- SubJobInfo subJobInfo = new SubJobInfo();
- // todo don't need whole jobRequest, but need executeUser
- subJobInfo.setJobReq(getJobRequest());
- subJobInfo.setStatus(SchedulerEventState.Inited().toString());
- subJobInfo.setCode(code);
- // persist and update jobDetail
- SubJobDetail subJobDetail = createNewJobDetail();
- subJobInfo.setSubJobDetail(subJobDetail);
- subJobInfo.setProgress(0.0f);
- subJobDetail.setExecutionContent(code);
- subJobDetail.setJobGroupId(getJobRequest().getId());
- subJobDetail.setStatus(SchedulerEventState.Inited().toString());
- subJobDetail.setCreatedTime(
- new Date(System.currentTimeMillis()));
- subJobDetail.setUpdatedTime(
- new Date(System.currentTimeMillis()));
- try {
- persistenceManager
- .createPersistenceEngine()
- .persist(subJobInfo);
- } catch (Exception e1) {
- errList.add(
- new EntranceErrorException(
- EntranceErrorCode.INIT_JOB_ERROR
- .getErrCode(),
- "Init subjob error, please submit it again(任务初始化失败,请稍后重试). "
- + e1.getMessage()));
- }
- return subJobInfo;
- })
- .toArray(SubJobInfo[]::new);
- if (errList.size() > 0) {
- logger.error(errList.get(0).getDesc());
- throw errList.get(0);
- }
- setJobGroups(subJobInfos);
updateNewestAccessByClientTimestamp();
}
- @Override
- public SubJobInfo getRunningSubJob() {
- if (runningIndex < getJobGroups().length) {
- return getJobGroups()[runningIndex];
- } else {
- return null;
- }
- }
-
@Override
public ExecuteRequest jobToExecuteRequest() throws EntranceErrorException {
// add resultSet path root
@@ -212,71 +153,9 @@ public class EntranceExecutionJob extends EntranceJob implements LogHandler {
runtimeMapOri.put(QueryParams$.MODULE$.JOB_KEY(), jobMap);
EntranceExecuteRequest executeRequest = new EntranceExecuteRequest(this);
- boolean isCompleted = true;
- boolean isHead = false;
- boolean isTail = false;
- if (null != jobGroups() && jobGroups().length > 0) {
- for (int i = 0; i < jobGroups().length; i++) {
- if (null != jobGroups()[i].getSubJobDetail()) {
- SubJobDetail subJobDetail = jobGroups()[i].getSubJobDetail();
- if (SchedulerEventState.isCompletedByStr(subJobDetail.getStatus())) {
- continue;
- } else {
- isCompleted = false;
- executeRequest.setExecutionCode(i);
- runningIndex = i;
- subJobDetail.setPriority(i);
- break;
- }
- } else {
- throw new EntranceErrorException(
- EntranceErrorCode.EXECUTE_REQUEST_INVALID.getErrCode(),
- "Subjob was not inited, please submit again.");
- }
- }
- if (0 == runningIndex) {
- isHead = true;
- } else {
- isHead = false;
- }
- if (runningIndex >= jobGroups().length - 1) {
- isTail = true;
- } else {
- isTail = false;
- }
- } else {
- isHead = true;
- isTail = true;
- }
- BindEngineLabel bindEngineLabel =
- new BindEngineLabel()
- .setJobGroupId(getJobRequest().getId().toString())
- .setIsJobGroupHead(String.valueOf(isHead))
- .setIsJobGroupEnd(String.valueOf(isTail));
- if (isHead) {
- jobMap.put(GovernanceConstant.RESULTSET_INDEX(), 0);
- setResultSize(0);
- } else {
- jobMap.put(GovernanceConstant.RESULTSET_INDEX(), addAndGetResultSize(0));
- }
- List<Label<?>> labels = new ArrayList<Label<?>>();
- labels.addAll(getJobRequest().getLabels());
- labels.add(bindEngineLabel);
+ List<Label<?>> labels = new ArrayList<Label<?>>(getJobRequest().getLabels());
executeRequest.setLabels(labels);
- if (isCompleted) {
- return null;
- } else {
- return executeRequest;
- }
- }
-
- private SubJobDetail createNewJobDetail() {
- SubJobDetail subJobDetail = new SubJobDetail();
- subJobDetail.setUpdatedTime(subJobDetail.getCreatedTime());
- subJobDetail.setJobGroupId(getJobRequest().getId());
- subJobDetail.setStatus(SchedulerEventState.Scheduled().toString());
- subJobDetail.setJobGroupInfo("");
- return subJobDetail;
+ return executeRequest;
}
@Override
@@ -370,43 +249,4 @@ public class EntranceExecutionJob extends EntranceJob implements LogHandler {
logger.warn("Close logWriter and logReader failed. {}", e.getMessage(), e);
}
}
-
- @Override
- public float getProgress() {
- float progress = super.getProgress();
- SubJobInfo[] subJobInfoArray;
- if (progress < 1.0 && (subJobInfoArray = getJobGroups()).length > 0) {
- int groupCount = subJobInfoArray.length;
- float progressValueSum = 0.0f;
- for (SubJobInfo subJobInfo : subJobInfoArray) {
- progressValueSum += subJobInfo.getProgress();
- }
- return progressValueSum / (float) groupCount;
- }
- return progress;
- }
-
- /**
- * // The front end needs to obtain data //if (EntranceJob.JOB_COMPLETED_PROGRESS() ==
- * getProgress()) { // return new JobProgressInfo[0]; //}
- *
- * @return
- */
- @Override
- public JobProgressInfo[] getProgressInfo() {
- SubJobInfo[] subJobInfoArray = getJobGroups();
- if (subJobInfoArray.length > 0) {
- List<JobProgressInfo> progressInfoList = new ArrayList<>();
- for (SubJobInfo subJobInfo : subJobInfoArray) {
- progressInfoList.addAll(subJobInfo.getProgressInfoMap().values());
- }
- return progressInfoList.toArray(new JobProgressInfo[] {});
- }
- return super.getProgressInfo();
- }
-
- @Override
- public int getRunningSubJobIndex() {
- return runningIndex;
- }
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceEngine.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceEngine.java
index a6ef1382c..3f8fac8ea 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceEngine.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceEngine.java
@@ -23,12 +23,15 @@ import org.apache.linkis.entrance.conf.EntranceConfiguration$;
import org.apache.linkis.entrance.exception.EntranceIllegalParamException;
import org.apache.linkis.entrance.exception.EntranceRPCException;
import org.apache.linkis.entrance.exception.QueryFailedException;
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
import org.apache.linkis.governance.common.constant.job.JobRequestConstants;
import org.apache.linkis.governance.common.entity.job.JobRequest;
-import org.apache.linkis.governance.common.entity.job.SubJobDetail;
-import org.apache.linkis.governance.common.entity.job.SubJobInfo;
-import org.apache.linkis.governance.common.entity.task.*;
-import org.apache.linkis.governance.common.protocol.job.*;
+import org.apache.linkis.governance.common.entity.task.RequestPersistTask;
+import org.apache.linkis.governance.common.entity.task.RequestReadAllTask;
+import org.apache.linkis.governance.common.entity.task.ResponsePersist;
+import org.apache.linkis.governance.common.protocol.job.JobReqInsert;
+import org.apache.linkis.governance.common.protocol.job.JobReqUpdate;
+import org.apache.linkis.governance.common.protocol.job.JobRespProtocol;
import org.apache.linkis.protocol.constants.TaskConstant;
import org.apache.linkis.protocol.message.RequestProtocol;
import org.apache.linkis.protocol.task.Task;
@@ -50,7 +53,7 @@ public class QueryPersistenceEngine extends AbstractPersistenceEngine {
private Sender sender;
private static final Logger logger = LoggerFactory.getLogger(QueryPersistenceEngine.class);
- private static final int MAX_DESC_LEN = 320;
+ private static final int MAX_DESC_LEN = GovernanceCommonConf.ERROR_CODE_DESC_LEN();
private static final int RETRY_NUMBER =
EntranceConfiguration.JOBINFO_UPDATE_RETRY_MAX_TIME().getValue();
@@ -66,30 +69,6 @@ public class QueryPersistenceEngine extends AbstractPersistenceEngine {
.getValue());
}
- @Override
- public void persist(SubJobInfo subJobInfo)
- throws QueryFailedException, EntranceIllegalParamException {
- if (null == subJobInfo || null == subJobInfo.getSubJobDetail()) {
- throw new EntranceIllegalParamException(
- 20004, "JobDetail can not be null, unable to do persist operation");
- }
- JobDetailReqInsert jobReqInsert = new JobDetailReqInsert(subJobInfo);
- JobRespProtocol jobRespProtocol =
- sendToJobHistoryAndRetry(
- jobReqInsert, "subJobInfo of job" + subJobInfo.getJobReq().getId());
- if (jobRespProtocol != null) {
- Map<String, Object> data = jobRespProtocol.getData();
- Object object = data.get(JobRequestConstants.JOB_ID());
- if (object == null) {
- throw new QueryFailedException(
- 20011, "Insert jobDetail failed, reason: " + jobRespProtocol.getMsg());
- }
- String jobIdStr = object.toString();
- Long jobId = Long.parseLong(jobIdStr);
- subJobInfo.getSubJobDetail().setId(jobId);
- }
- }
-
private JobRespProtocol sendToJobHistoryAndRetry(RequestProtocol jobReq, String msg)
throws QueryFailedException {
JobRespProtocol jobRespProtocol = null;
@@ -158,33 +137,6 @@ public class QueryPersistenceEngine extends AbstractPersistenceEngine {
jobReqUpdate, "job:" + jobReq.getReqId() + "status:" + jobReq.getStatus());
}
- @Override
- public SubJobDetail retrieveJobDetailReq(Long jobDetailId)
- throws EntranceIllegalParamException, EntranceRPCException {
-
- if (jobDetailId == null || jobDetailId < 0) {
- throw new EntranceIllegalParamException(20003, "taskID can't be null or less than 0");
- }
- SubJobDetail subJobDetail = new SubJobDetail();
- subJobDetail.setId(jobDetailId);
- JobDetailReqQuery jobDetailReqQuery = new JobDetailReqQuery(subJobDetail);
- ResponseOneJobDetail responseOneJobDetail = null;
- try {
- responseOneJobDetail = (ResponseOneJobDetail) sender.ask(jobDetailReqQuery);
- if (null != responseOneJobDetail) {
- return responseOneJobDetail.jobDetail();
- }
- } catch (Exception e) {
- logger.error(
- "Requesting the corresponding jobDetail failed with jobDetailId: {}(通过jobDetailId: {} 请求相应的task失败)",
- jobDetailId,
- jobDetailId,
- e);
- throw new EntranceRPCException(20020, "sender rpc failed", e);
- }
- return null;
- }
-
@Override
public void persist(JobRequest jobReq) throws ErrorException {
if (null == jobReq) {
@@ -206,27 +158,6 @@ public class QueryPersistenceEngine extends AbstractPersistenceEngine {
}
}
- @Override
- public void updateIfNeeded(SubJobInfo subJobInfo)
- throws QueryFailedException, EntranceIllegalParamException {
- if (null == subJobInfo || null == subJobInfo.getSubJobDetail()) {
- throw new EntranceIllegalParamException(
- 20004, "task can not be null, unable to do update operation");
- }
- JobDetailReqUpdate jobDetailReqUpdate = new JobDetailReqUpdate(subJobInfo);
- jobDetailReqUpdate
- .jobInfo()
- .getSubJobDetail()
- .setUpdatedTime(new Date(System.currentTimeMillis()));
- JobRespProtocol jobRespProtocol =
- sendToJobHistoryAndRetry(
- jobDetailReqUpdate,
- "jobDetail:"
- + subJobInfo.getSubJobDetail().getId()
- + "status:"
- + subJobInfo.getStatus());
- }
-
@Override
public Task[] readAll(String instance)
throws EntranceIllegalParamException, EntranceRPCException, QueryFailedException {
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
index 1f71e198d..a2920f89d 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
@@ -18,20 +18,15 @@
package org.apache.linkis.entrance.persistence;
import org.apache.linkis.common.exception.ErrorException;
-import org.apache.linkis.common.io.FsPath;
import org.apache.linkis.entrance.EntranceContext;
import org.apache.linkis.entrance.cli.heartbeat.CliHeartbeatMonitor;
import org.apache.linkis.entrance.cs.CSEntranceHelper;
import org.apache.linkis.entrance.execute.EntranceJob;
import org.apache.linkis.entrance.log.FlexibleErrorCodeManager;
import org.apache.linkis.governance.common.entity.job.JobRequest;
-import org.apache.linkis.governance.common.entity.job.SubJobInfo;
import org.apache.linkis.protocol.engine.JobProgressInfo;
import org.apache.linkis.scheduler.executer.OutputExecuteResponse;
import org.apache.linkis.scheduler.queue.Job;
-import org.apache.linkis.server.BDPJettyServerHelper;
-
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,47 +80,7 @@ public class QueryPersistenceManager extends PersistenceManager {
}
@Override
- public void onResultSetCreated(Job job, OutputExecuteResponse response) {
- String path;
- try {
- path = createResultSetEngine().persistResultSet(job, response);
- } catch (Exception e) {
- String msg =
- "Persist resultSet failed for subJob : "
- + job.getId()
- + ", response : "
- + BDPJettyServerHelper.gson().toJson(response);
- logger.error(msg);
- if (null != job) {
- job.onFailure("persist resultSet failed!", e);
- } else {
- logger.error("Cannot find job : {} in cache of ExecutorManager.", job.getId(), e);
- }
- return;
- }
- if (StringUtils.isNotBlank(path) && job instanceof EntranceJob) {
- EntranceJob entranceJob = (EntranceJob) job;
- SubJobInfo subJobInfo = entranceJob.getRunningSubJob();
- String resultLocation =
- entranceJob.getRunningSubJob().getSubJobDetail().getResultLocation();
- if (StringUtils.isEmpty(resultLocation)) {
- synchronized (subJobInfo.getSubJobDetail()) {
- // todo check
- if (StringUtils.isNotEmpty(subJobInfo.getSubJobDetail().getResultLocation())) {
- return;
- }
- try {
- subJobInfo
- .getSubJobDetail()
- .setResultLocation(new FsPath(path).getSchemaPath());
- createPersistenceEngine().updateIfNeeded(subJobInfo);
- } catch (Exception e) {
- entranceContext.getOrCreateLogManager().onLogUpdate(job, e.toString());
- }
- }
- }
- }
- }
+ public void onResultSetCreated(Job job, OutputExecuteResponse response) {}
@Override
public void onProgressUpdate(Job job, float progress, JobProgressInfo[] progressInfo) {
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
index 496785cad..6c2fb9101 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala
@@ -145,10 +145,6 @@ class EntranceWebSocketService extends ServerEventService with EntranceEventList
LogUtils.generateInfo(s"Your job's execution code is (after variable substitution and code check) "))
entranceServer.getEntranceContext.getOrCreateLogManager().onLogUpdate(job,
"************************************SCRIPT CODE************************************")
- val jobGroups = job.asInstanceOf[EntranceJob].getJobGroups
- jobGroups.foreach(subJobInfo => {
- entranceServer.getEntranceContext.getOrCreateLogManager().onLogUpdate(job, subJobInfo.getCode)
- })
entranceServer.getEntranceContext.getOrCreateLogManager().onLogUpdate(job,
"************************************SCRIPT CODE************************************")
entranceServer.getEntranceContext.getOrCreateLogManager().onLogUpdate(job,
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
index f298e6020..97cec44f1 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
@@ -25,7 +25,6 @@ import org.apache.linkis.entrance.job.EntranceExecuteRequest
import org.apache.linkis.entrance.orchestrator.EntranceOrchestrationFactory
import org.apache.linkis.entrance.utils.JobHistoryHelper
import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
-import org.apache.linkis.governance.common.entity.job.SubJobInfo
import org.apache.linkis.governance.common.protocol.task.ResponseTaskStatus
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.CodeLanguageLabel
@@ -41,25 +40,18 @@ import org.apache.linkis.orchestrator.execution.{ArrayResultSetTaskResponse, Fai
import org.apache.linkis.orchestrator.plans.unit.CodeLogicalUnit
import org.apache.linkis.protocol.constants.TaskConstant
import org.apache.linkis.scheduler.executer._
-import org.apache.linkis.scheduler.queue.SchedulerEventState
import org.apache.linkis.server.BDPJettyServerHelper
import java.util
import java.util.Date
-
class DefaultEntranceExecutor(id: Long, mark: MarkReq, entranceExecutorManager: EntranceExecutorManager) extends EntranceExecutor(id, mark) with SingleTaskOperateSupport with Logging {
- /* private def doMethod[T](exec: String => T): T = if (engineReturns.isEmpty)
- throw new EntranceErrorException(20001, s"Engine${id} could not find a job in RUNNING state(Engine${id}找不到处于RUNNING状态的Job)")
- else exec(engineReturns(0).execId)*/
-
-
/**
- *1. get logProcessor by log operate
- *2. update log by logListener
+ * 1. get logProcessor by log operate
+ * 2. update log by logListener
*
* @param orchestratorFuture
*/
@@ -68,7 +60,6 @@ class DefaultEntranceExecutor(id: Long, mark: MarkReq, entranceExecutorManager:
logProcessor.registerLogNotify(logEvent => {
if (null != job) {
job.getLogListener.foreach(_.onLogUpdate(job, logEvent.log))
- job.getJobGroups
}
})
logProcessor
@@ -78,62 +69,29 @@ class DefaultEntranceExecutor(id: Long, mark: MarkReq, entranceExecutorManager:
val progressProcessor = orchestratorFuture.operate[ProgressProcessor](DefaultProgressOperation.PROGRESS_NAME)
progressProcessor.doOnObtain(progressInfoEvent => {
if (null != entranceJob) {
- val jobGroups = entranceJob.getJobGroups
- if (jobGroups.length > 0) {
- val subJobInfo = entranceJob.getRunningSubJob
- if (null != subJobInfo) {
- //Update progress value
- subJobInfo.setProgress(progressInfoEvent.progress)
- val runningIndex = entranceJob.getRunningSubJobIndex
- val jobGroupSize = jobGroups.length
- if (runningIndex >= 0 && runningIndex <= jobGroupSize -1) {
- val totalProgress = 1.0 * (runningIndex + progressInfoEvent.progress) / jobGroupSize
- //Update progress info
- if (null != progressInfoEvent.progressInfo) {
- progressInfoEvent.progressInfo.foreach(progressInfo =>
- subJobInfo.getProgressInfoMap.put(progressInfo.id, progressInfo))
- }
- entranceJob.getProgressListener.foreach(_.onProgressUpdate(entranceJob, totalProgress.toFloat,
- entranceJob.getProgressInfo))
- } else {
- logger.error("Invalid runningIndex.")
- }
- }
- } else {
- entranceJob.getProgressListener.foreach(_.onProgressUpdate(entranceJob, progressInfoEvent.progress,
- entranceJob.getProgressInfo))
- }
- JobHistoryHelper.updateJobRequestMetrics(entranceJob.getJobRequest, progressInfoEvent.resourceMap, progressInfoEvent.infoMap)
+ entranceJob.getProgressListener.foreach(_.onProgressUpdate(entranceJob, progressInfoEvent.progress,
+ entranceJob.getProgressInfo))
+ JobHistoryHelper.updateJobRequestMetrics(entranceJob.getJobRequest, progressInfoEvent.resourceMap, progressInfoEvent.infoMap)
}
})
progressProcessor
}
- def searchJobGroupInProgress(jobGroups: Array[SubJobInfo]): SubJobInfo = {
- for (jobGroup <- jobGroups) {
- if (jobGroup.getProgress > 0 && jobGroup.getProgress < 1.0) {
- return jobGroup
- }
- }
- null
- }
def dealResponse(orchestrationResponse: OrchestrationResponse, entranceExecuteRequest: EntranceExecuteRequest, orchestration: Orchestration): Unit = {
orchestrationResponse match {
case succeedResponose: SucceedTaskResponse =>
succeedResponose match {
case resultSetResp: ResultSetTaskResponse =>
- logger.info(s"SubJob : ${entranceExecuteRequest.getSubJobInfo.getSubJobDetail.getId} succeed to execute task, and get result.")
- // todo check null alias
- entranceExecuteRequest.getJob.asInstanceOf[EntranceJob].addAndGetResultSize(0)
+ logger.info(s"JobRequest : ${entranceExecuteRequest.jobId()} succeed to execute task, and get result.")
entranceExecuteRequest.getJob.getEntranceContext.getOrCreatePersistenceManager()
.onResultSetCreated(entranceExecuteRequest.getJob, AliasOutputExecuteResponse(null, resultSetResp.getResultSet))
//
case arrayResultSetPathResp: ArrayResultSetTaskResponse =>
- logger.info(s"SubJob : ${entranceExecuteRequest.getSubJobInfo.getSubJobDetail.getId} succeed to execute task, and get result array.")
+ logger.info(s"JobRequest : ${entranceExecuteRequest.jobId()} succeed to execute task, and get result array.")
if (null != arrayResultSetPathResp.getResultSets && arrayResultSetPathResp.getResultSets.length > 0) {
val resultsetSize = arrayResultSetPathResp.getResultSets.length
- entranceExecuteRequest.getSubJobInfo.getSubJobDetail.setResultSize(resultsetSize)
+ entranceExecuteRequest.getJob.setResultSize(resultsetSize)
entranceExecuteRequest.getJob.asInstanceOf[EntranceJob].addAndGetResultSize(resultsetSize)
}
val firstResultSet = arrayResultSetPathResp.getResultSets.headOption.orNull
@@ -154,39 +112,24 @@ class DefaultEntranceExecutor(id: Long, mark: MarkReq, entranceExecutorManager:
}
}
case _ =>
- logger.info(s"SubJob : ${entranceExecuteRequest.getSubJobInfo.getSubJobDetail.getId} succeed to execute task,no result.")
- }
- entranceExecuteRequest.getSubJobInfo.setStatus(SchedulerEventState.Succeed.toString)
- entranceExecuteRequest.getJob.getEntranceContext.getOrCreatePersistenceManager().createPersistenceEngine().updateIfNeeded(entranceExecuteRequest.getSubJobInfo)
- entranceExecuteRequest.getJob.getLogListener.foreach(_.onLogUpdate(entranceExecuteRequest.getJob, LogUtils.generateInfo(s"Your subjob : ${entranceExecuteRequest.getSubJobInfo.getSubJobDetail().getId} execue with state succeed, has ${entranceExecuteRequest.getSubJobInfo.getSubJobDetail().getResultSize} resultsets.")))
- // submit next subJob
- val executeRequest = entranceExecuteRequest.getJob.jobToExecuteRequest()
- if (null != executeRequest) {
- // clear subjob cache
- callExecute(executeRequest)
- } else {
- entranceExecuteRequest.getJob.getLogListener.foreach(_.onLogUpdate(entranceExecuteRequest.getJob, LogUtils.generateInfo(s"Congratuaions! Your job : ${entranceExecuteRequest.getJob.getId} executed with status succeed and ${entranceExecuteRequest.getJob.addAndGetResultSize(0)} results.")))
- Utils.tryAndWarn(doOnSucceed(entranceExecuteRequest))
+ logger.info(s"JobRequest : ${entranceExecuteRequest.jobId()} succeed to execute task,no result.")
}
+ entranceExecuteRequest.getJob.getLogListener.foreach(_.onLogUpdate(entranceExecuteRequest.getJob, LogUtils.generateInfo(s"Congratuaions! Your job : ${entranceExecuteRequest.getJob.getId} executed with status succeed and ${entranceExecuteRequest.getJob.addAndGetResultSize(0)} results.")))
+ Utils.tryAndWarn(doOnSucceed(entranceExecuteRequest))
case failedResponse: FailedTaskResponse =>
- entranceExecuteRequest.getSubJobInfo.setStatus(SchedulerEventState.Failed.toString)
- entranceExecuteRequest.getJob.getEntranceContext.getOrCreatePersistenceManager().createPersistenceEngine().updateIfNeeded(entranceExecuteRequest.getSubJobInfo)
-
Utils.tryAndWarn {
doOnFailed(entranceExecuteRequest, orchestration, failedResponse)
}
- case o =>
- val msg = s"Job : ${entranceExecuteRequest.getJob.getId} , subJob : ${entranceExecuteRequest.getSubJobInfo.getSubJobDetail.getId} returnd unknown response : ${BDPJettyServerHelper.gson.toJson(o)}"
+ case _ =>
+ val msg = s"Job : ${entranceExecuteRequest.getJob.getId} , JobRequest id: ${entranceExecuteRequest.jobId()} returnd unknown response}"
logger.error(msg)
entranceExecuteRequest.getJob.getLogListener.foreach(_.onLogUpdate(entranceExecuteRequest.getJob, LogUtils.generateERROR(msg)))
- // todo
}
}
def requestToComputationJobReq(entranceExecuteRequest: EntranceExecuteRequest): JobReq = {
val jobReqBuilder = ComputationJobReq.newBuilder()
- val subJobId = String.valueOf(entranceExecuteRequest.getSubJobInfo.getSubJobDetail.getId)
- jobReqBuilder.setId(subJobId)
+ jobReqBuilder.setId(entranceExecuteRequest.jobId())
jobReqBuilder.setSubmitUser(entranceExecuteRequest.submitUser())
jobReqBuilder.setExecuteUser(entranceExecuteRequest.executeUser())
val codeTypeLabel: Label[_] = LabelUtil.getCodeTypeLabel(entranceExecuteRequest.getLabels)
@@ -232,14 +175,12 @@ class DefaultEntranceExecutor(id: Long, mark: MarkReq, entranceExecutorManager:
}
-
override def pause(): Boolean = {
//TODO
true
}
-
override def resume(): Boolean = {
//TODO
true
@@ -261,7 +202,7 @@ class DefaultEntranceExecutor(id: Long, mark: MarkReq, entranceExecutorManager:
// 2. orchestrate compJobReq get Orchestration
val orchestration = EntranceOrchestrationFactory.getOrchestrationSession().orchestrate(compJobReq)
val orchestratorFuture = orchestration.asyncExecute()
- val msg = s"Job with jobGroupId : ${entranceExecuteRequest.getJob.getJobRequest.getId} and subJobId : ${entranceExecuteRequest.getSubJobInfo.getSubJobDetail.getId} was submitted to Orchestrator."
+ val msg = s"JobRequest (${entranceExecuteRequest.jobId()}) was submitted to Orchestrator."
logger.info(msg)
entranceExecuteRequest.getJob.getLogListener.foreach(_.onLogUpdate(entranceExecuteRequest.getJob, LogUtils.generateInfo(msg)))
@@ -284,21 +225,20 @@ class DefaultEntranceExecutor(id: Long, mark: MarkReq, entranceExecutorManager:
getEngineExecuteAsyncReturn.foreach(_.closeOrchestration())
getEngineExecuteAsyncReturn.get
} else {
- logger.info(s"For job ${entranceExecuteRequest.getJob.getId} and subJob ${compJobReq.getId} to create EngineExecuteAsyncReturn")
+ logger.info(s"For job ${entranceExecuteRequest.jobId()} and orchestrator task id ${compJobReq.getId} to create EngineExecuteAsyncReturn")
new EngineExecuteAsyncReturn(request, null)
}
jobReturn.setOrchestrationObjects(orchestratorFuture, logProcessor, progressAndResourceProcessor)
- jobReturn.setSubJobId(compJobReq.getId)
setEngineReturn(jobReturn)
jobReturn
} { t: Throwable =>
if (getEngineExecuteAsyncReturn.isEmpty) {
- val msg = s"task(${entranceExecuteRequest.getSubJobInfo.getSubJobDetail.getId}) submit failed, reason, ${ExceptionUtils.getMessage(t)}"
+ val msg = s"JobRequest (${entranceExecuteRequest.jobId()}) submit failed, reason, ${ExceptionUtils.getMessage(t)}"
entranceExecuteRequest.getJob.getLogListener.foreach(_.onLogUpdate(entranceExecuteRequest.getJob, LogUtils.generateERROR(ExceptionUtils.getStackTrace(t))))
ErrorExecuteResponse(msg, t)
} else {
- val msg = s"task(${entranceExecuteRequest.getSubJobInfo.getSubJobDetail.getId}) submit failed, reason, ${ExceptionUtils.getMessage(t)}"
+ val msg = s"JobRequest (${entranceExecuteRequest.jobId()}) submit failed, reason, ${ExceptionUtils.getMessage(t)}"
val failedResponse = new DefaultFailedTaskResponse(msg, EntranceErrorCode.SUBMIT_JOB_ERROR.getErrCode, t)
doOnFailed(entranceExecuteRequest, null, failedResponse)
null
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
index d3b0a11aa..156f17081 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
@@ -136,9 +136,6 @@ class EngineExecuteAsyncReturn(val request: ExecuteRequest,
private var progressProcessor: ProgressProcessor = _
- private var resourceReportProcessor : ResourceReportProcessor = _
-
- private var subJobId: String = _
def getLastNotifyTime: Long = lastNotifyTime
@@ -165,9 +162,6 @@ class EngineExecuteAsyncReturn(val request: ExecuteRequest,
getProgressProcessor().foreach(IOUtils.closeQuietly(_))
}
- def setSubJobId(subJobId: String): Unit = {
- this.subJobId = subJobId
- }
private[execute] def notifyStatus(responseEngineStatus: ResponseTaskStatus): Unit = {
lastNotifyTime = System.currentTimeMillis()
@@ -183,7 +177,7 @@ class EngineExecuteAsyncReturn(val request: ExecuteRequest,
r match {
case ErrorExecuteResponse(errorMsg, error) =>
val errorStackTrace = if (error != null) ExceptionUtils.getStackTrace(error) else StringUtils.EMPTY
- val msg = s"Job with execId-$id + subJobId : $subJobId execute failed,$errorMsg \n $errorStackTrace"
+ val msg = s"jobRequest($id) execute failed,$errorMsg \n ${errorStackTrace}"
entranceExecuteRequest.getJob.getLogListener.foreach(_.onLogUpdate(entranceExecuteRequest.getJob, LogUtils.generateERROR(msg)))
case _ =>
}
@@ -232,6 +226,5 @@ class EngineExecuteAsyncReturn(val request: ExecuteRequest,
override def notify(rs: ExecuteResponse => Unit): Unit = {
notifyJob = rs
- // this synchronized notify()
}
}
\ No newline at end of file
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
index 509d3e0cd..d12b83fd5 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
@@ -24,7 +24,7 @@ import org.apache.linkis.entrance.EntranceContext
import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.entrance.event._
import org.apache.linkis.entrance.exception.EntranceErrorException
-import org.apache.linkis.governance.common.entity.job.{JobRequest, SubJobInfo}
+import org.apache.linkis.governance.common.entity.job.JobRequest
import org.apache.linkis.governance.common.paser.CodeParser
import org.apache.linkis.protocol.constants.TaskConstant
import org.apache.linkis.protocol.engine.JobProgressInfo
@@ -43,21 +43,23 @@ abstract class EntranceJob extends Job {
@BeanProperty
var creator: String = _
+
@BeanProperty
var user: String = _
+
@BeanProperty
var params: util.Map[String, Any] = new util.HashMap[String, Any](1)
+
@BeanProperty
var jobRequest: JobRequest = _
- @BeanProperty
- var jobGroups: Array[SubJobInfo] = new Array[SubJobInfo](0)
+
@BeanProperty
var codeParser: CodeParser = _
private var entranceListenerBus: Option[EntranceEventListenerBus[EntranceEventListener, EntranceEvent]] = None
private var progressInfo: Array[JobProgressInfo] = Array.empty
private val persistedResultSets = new AtomicInteger(0)
- // private var resultSize = -1
+
private var entranceContext: EntranceContext = _
/**
@@ -65,7 +67,7 @@ abstract class EntranceJob extends Job {
* Can be used to monitor client status.
* e.g. server can detect if linkis-cli process has abnormally ended then kill the job
* */
- private val newestAccessByClientTimestamp: AtomicLong = new AtomicLong(-1l) //volatile
+ private val newestAccessByClientTimestamp: AtomicLong = new AtomicLong(-1L)
def setEntranceListenerBus(entranceListenerBus: EntranceEventListenerBus[EntranceEventListener, EntranceEvent]): Unit =
this.entranceListenerBus = Option(entranceListenerBus)
@@ -85,18 +87,7 @@ abstract class EntranceJob extends Job {
newestAccessByClientTimestamp.set(newTime)
}
- def getRunningSubJobIndex: Int
-
- def getRunningSubJob: SubJobInfo = {
- if (null != jobGroups && jobGroups.size > 0) {
- jobGroups(0)
- } else {
- null
- }
- }
-
def setResultSize(resultSize: Int): Unit = {
- // this.resultSize = resultSize
if (resultSize >= 0) {
persistedResultSets.set(resultSize)
}
@@ -121,34 +112,19 @@ abstract class EntranceJob extends Job {
override def beforeStateChanged(fromState: SchedulerEventState, toState: SchedulerEventState): Unit = {
- // if (SchedulerEventState.isCompleted(toState) && (resultSize < 0 || persistedResultSets.get() < resultSize)) {
- /*val startWaitForPersistedTime = System.currentTimeMillis
- persistedResultSets synchronized {
- while ((resultSize < 0 || persistedResultSets.get() < resultSize) && getErrorResponse == null && !isWaitForPersistedTimeout(startWaitForPersistedTime))
- persistedResultSets.wait(3000)
- }
- if (isWaitForPersistedTimeout(startWaitForPersistedTime)) onFailure("persist resultSets timeout!", new EntranceErrorException(20305, "persist resultSets timeout!"))
- if (isSucceed && getErrorResponse != null) {
- val _toState = if (getErrorResponse.t == null) Cancelled else Failed
- transition(_toState)
- return
- }*/
- // }
super.beforeStateChanged(fromState, toState)
}
override def afterStateChanged(fromState: SchedulerEventState, toState: SchedulerEventState): Unit = {
- //updateJobRequestStatus(toState.toString)
super.afterStateChanged(fromState, toState)
toState match {
case Scheduled =>
- //Entrance指标:任务排队结束时间
- if(getJobRequest.getMetrics == null){
+ if (getJobRequest.getMetrics == null) {
getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateWarn("Job Metrics has not been initialized.")))
- }else{
- if(getJobRequest.getMetrics.containsKey(TaskConstant.ENTRANCEJOB_SCHEDULE_TIME)){
+ } else {
+ if (getJobRequest.getMetrics.containsKey(TaskConstant.ENTRANCEJOB_SCHEDULE_TIME)) {
getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateWarn("Your job has already been scheduled before.")))
- }else{
+ } else {
getJobRequest.getMetrics.put(TaskConstant.ENTRANCEJOB_SCHEDULE_TIME, new Date(System.currentTimeMillis))
}
}
@@ -157,24 +133,26 @@ abstract class EntranceJob extends Job {
getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo("Your job is turn to retry. Please wait it to schedule.")))
case Running =>
getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo("Your job is Running now. Please wait it to complete.")))
- //TODO job start event
+
case _ if SchedulerEventState.isCompleted(toState) =>
endTime = System.currentTimeMillis()
- //Entrance指标,任务完成时间
+
getJobRequest.getMetrics.put(TaskConstant.ENTRANCEJOB_COMPLETE_TIME, new Date(System.currentTimeMillis()))
if (getJobInfo != null) getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo(getJobInfo.getMetric)))
- if (isSucceed)
+ if (isSucceed) {
getLogListener.foreach(_.onLogUpdate(this,
LogUtils.generateInfo("Congratulations. Your job completed with status Success.")))
- else getLogListener.foreach(_.onLogUpdate(this,
- LogUtils.generateInfo(s"Sorry. Your job completed with a status $toState. You can view logs for the reason.")))
+ } else {
+ getLogListener.foreach(_.onLogUpdate(this,
+ LogUtils.generateInfo(s"Sorry. Your job completed with a status $toState. You can view logs for the reason.")))
+ }
this.setProgress(EntranceJob.JOB_COMPLETED_PROGRESS)
entranceListenerBus.foreach(_.post(EntranceProgressEvent(this, EntranceJob.JOB_COMPLETED_PROGRESS, this.getProgressInfo)))
this.getProgressListener.foreach(listener => listener.onProgressUpdate(this, EntranceJob.JOB_COMPLETED_PROGRESS, Array[JobProgressInfo]()))
getEntranceContext.getOrCreatePersistenceManager().createPersistenceEngine().updateIfNeeded(getJobRequest)
case _ =>
}
- entranceListenerBus.foreach(_.post(EntranceJobEvent(this.getId)))
+ entranceListenerBus.foreach(_.post(EntranceJobEvent(this.getId())))
}
override def onFailure(errorMsg: String, t: Throwable): Unit = {
@@ -213,18 +191,17 @@ abstract class EntranceJob extends Job {
(if (RPCUtils.isReceiverNotExists(errorExecuteResponse.t)) {
getExecutor match {
case e: EntranceExecutor =>
- // val instance = e.getInstance.getInstance
getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateSystemWarn(s"Since the submitted engine rejects the connection, the system will automatically retry and exclude the engine.(由于提交的引擎拒绝连接,系统将自动进行重试,并排除引擎.)")))
case _ =>
}
true
} else super.isJobShouldRetry(errorExecuteResponse))
- def operation[T](operate: EntranceExecutor => T ): T = {
+ def operation[T](operate: EntranceExecutor => T): T = {
this.getExecutor match {
case entranceExecutor: EntranceExecutor =>
operate(entranceExecutor)
- case _ => throw new EntranceErrorException(10000, "Unsupported operation (不支持的操作)")
+ case _ => throw new EntranceErrorException(10000, "Unsupported operation")
}
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/persistence/PersistenceEngine.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/persistence/PersistenceEngine.scala
index e841b149d..477f70e0a 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/persistence/PersistenceEngine.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/persistence/PersistenceEngine.scala
@@ -17,11 +17,12 @@
package org.apache.linkis.entrance.persistence
-import java.io.{Closeable, Flushable}
import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.governance.common.entity.job.{JobRequest, SubJobDetail, SubJobInfo}
+import org.apache.linkis.governance.common.entity.job.JobRequest
import org.apache.linkis.protocol.task.Task
+import java.io.{Closeable, Flushable}
+
trait PersistenceEngine extends Closeable with Flushable {
@@ -33,8 +34,7 @@ trait PersistenceEngine extends Closeable with Flushable {
@throws[ErrorException]
def persist(jobReq: JobRequest): Unit
- @throws[ErrorException]
- def persist(subjobInfo: SubJobInfo): Unit
+
/**
* If a task's progress, status, logs, and result set are updated, this method is updated <br>
@@ -44,8 +44,6 @@ trait PersistenceEngine extends Closeable with Flushable {
@throws[ErrorException]
def updateIfNeeded(jobReq: JobRequest): Unit
- @throws[ErrorException]
- def updateIfNeeded(subJobInfo: SubJobInfo): Unit
/**
* Used to hang up a unified import task through this method, and continue to do the processing.
@@ -65,7 +63,4 @@ trait PersistenceEngine extends Closeable with Flushable {
@throws[ErrorException]
def retrieveJobReq(jobGroupId: java.lang.Long): JobRequest
- @throws[ErrorException]
- def retrieveJobDetailReq(jobDetailId: java.lang.Long): SubJobDetail
-
}
\ No newline at end of file
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
index 7105a80c6..7473892db 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
@@ -17,27 +17,27 @@
package org.apache.linkis.entrance.utils
+import org.apache.commons.lang3.StringUtils
import org.apache.linkis.common.exception.ErrorException
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.entrance.exception.JobHistoryFailedException
import org.apache.linkis.entrance.execute.EntranceJob
import org.apache.linkis.governance.common.constant.job.JobRequestConstants
-import org.apache.linkis.governance.common.entity.job.{JobRequest, SubJobDetail, SubJobInfo}
+import org.apache.linkis.governance.common.entity.job.JobRequest
import org.apache.linkis.governance.common.protocol.job._
+import org.apache.linkis.manager.common.protocol.resource.ResourceWithStatus
+import org.apache.linkis.protocol.constants.TaskConstant
import org.apache.linkis.protocol.query.cache.{CacheTaskResult, RequestReadCache}
import org.apache.linkis.rpc.Sender
import org.apache.linkis.scheduler.queue.SchedulerEventState
+import sun.net.util.IPAddressUtil
import java.util
import java.util.Date
import javax.servlet.http.HttpServletRequest
-import org.apache.commons.lang3.StringUtils
-import org.apache.linkis.manager.common.protocol.resource.ResourceWithStatus
-import org.apache.linkis.protocol.constants.TaskConstant
-import sun.net.util.IPAddressUtil
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
object JobHistoryHelper extends Logging {
@@ -83,12 +83,6 @@ object JobHistoryHelper extends Logging {
* @param taskID
*/
def forceKill(taskID: Long): Unit = {
- val subJobInfo = new SubJobInfo
- val subJobDetail = new SubJobDetail
- subJobDetail.setId(taskID)
- subJobDetail.setStatus(SchedulerEventState.Cancelled.toString)
- subJobInfo.setSubJobDetail(subJobDetail)
- val jobDetailReqUpdate = JobDetailReqUpdate(subJobInfo)
val jobRequest = new JobRequest
jobRequest.setId(taskID)
jobRequest.setStatus(SchedulerEventState.Cancelled.toString)
@@ -96,7 +90,6 @@ object JobHistoryHelper extends Logging {
jobRequest.setUpdatedTime(new Date(System.currentTimeMillis()))
val jobReqUpdate = JobReqUpdate(jobRequest)
sender.ask(jobReqUpdate)
- sender.ask(jobDetailReqUpdate)
}
/**
@@ -105,15 +98,8 @@ object JobHistoryHelper extends Logging {
* @param taskIdList
*/
def forceBatchKill(taskIdList: util.ArrayList[java.lang.Long]): Unit = {
- val subJobInfoList = new util.ArrayList[SubJobInfo]()
val jobReqList = new util.ArrayList[JobRequest]()
- taskIdList.foreach(taskID => {
- val subJobInfo = new SubJobInfo
- val subJobDetail = new SubJobDetail
- subJobDetail.setId(taskID)
- subJobDetail.setStatus(SchedulerEventState.Cancelled.toString)
- subJobInfo.setSubJobDetail(subJobDetail)
- subJobInfoList.add(subJobInfo)
+ taskIdList.asScala.foreach(taskID => {
val jobRequest = new JobRequest
jobRequest.setId(taskID)
jobRequest.setStatus(SchedulerEventState.Cancelled.toString)
@@ -121,9 +107,7 @@ object JobHistoryHelper extends Logging {
jobRequest.setUpdatedTime(new Date(System.currentTimeMillis()))
jobReqList.add(jobRequest)
})
- val jobDetailReqBatchUpdate = JobDetailReqBatchUpdate(subJobInfoList)
val jobReqBatchUpdate = JobReqBatchUpdate(jobReqList)
- sender.ask(jobDetailReqBatchUpdate)
sender.ask(jobReqBatchUpdate)
}
diff --git a/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java b/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java
index 75893113e..e98811721 100644
--- a/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java
+++ b/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/restful/api/QueryRestfulApi.java
@@ -22,7 +22,6 @@ import org.apache.linkis.governance.common.entity.job.QueryException;
import org.apache.linkis.jobhistory.cache.impl.DefaultQueryCacheManager;
import org.apache.linkis.jobhistory.conf.JobhistoryConfiguration;
import org.apache.linkis.jobhistory.conversions.TaskConversions;
-import org.apache.linkis.jobhistory.dao.JobDetailMapper;
import org.apache.linkis.jobhistory.entity.*;
import org.apache.linkis.jobhistory.service.JobHistoryQueryService;
import org.apache.linkis.jobhistory.util.QueryUtils;
@@ -59,7 +58,6 @@ public class QueryRestfulApi {
private Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired private JobHistoryQueryService jobHistoryQueryService;
- @Autowired private JobDetailMapper jobDetailMapper;
@Autowired private DefaultQueryCacheManager queryCacheManager;
diff --git a/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/receiver/QueryReceiverChooser.scala b/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/receiver/QueryReceiverChooser.scala
deleted file mode 100644
index 36a6bfa64..000000000
--- a/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/receiver/QueryReceiverChooser.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-//package org.apache.linkis.jobhistory.receiver
-
-/*import org.apache.linkis.jobhistory.cache.QueryCacheService
-import org.apache.linkis.jobhistory.service.JobHistoryQueryService
-import org.apache.linkis.protocol.query.QueryProtocol
-import org.apache.linkis.rpc.{RPCMessageEvent, Receiver, ReceiverChooser}
-
-import javax.annotation.PostConstruct
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.stereotype.Component*/
-
-/**
-
-@Component
-class QueryReceiverChooser extends ReceiverChooser {
-
- @Autowired
- private var queryService: JobHistoryQueryService = _
- @Autowired
- private var queryCacheService: QueryCacheService = _
- private var receiver: Option[QueryReceiver] = _
-
- @PostConstruct
- def init(): Unit = receiver = Some(new QueryReceiver(queryService, queryCacheService))
-
- override def chooseReceiver(event: RPCMessageEvent): Option[Receiver] = event.message match {
- case _: QueryProtocol => receiver
- case _ => None
- }
-}
-*/
diff --git a/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryDetailQueryServiceImpl.scala b/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryDetailQueryServiceImpl.scala
index 2813b8f25..d7c9faf80 100644
--- a/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryDetailQueryServiceImpl.scala
+++ b/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryDetailQueryServiceImpl.scala
@@ -38,7 +38,7 @@ import scala.collection.JavaConversions._
import scala.collection.JavaConverters.asScalaBufferConverter
-@Service
+
class JobHistoryDetailQueryServiceImpl extends JobHistoryDetailQueryService with Logging {
@Autowired
@@ -188,36 +188,6 @@ class JobHistoryDetailQueryServiceImpl extends JobHistoryDetailQueryService with
jobResp
}
- /*private def queryTaskList2RequestPersistTaskList(queryTask: java.util.List[QueryTask]): java.util.List[RequestPersistTask] = {
- import scala.collection.JavaConversions._
- val tasks = new util.ArrayList[RequestPersistTask]
- import org.apache.linkis.jobhistory.conversions.TaskConversions.queryTask2RequestPersistTask
- queryTask.foreach(f => tasks.add(f))
- tasks
- }*/
-
-
-
- /*override def getJobDetailByIdAndName(jobDetailId: java.lang.Long, userName: String): QueryJobDetail = {
- val jobHistory = new JobHistory
- jobHistory.set
- val jobReq = new JobDetail
- jobReq.setId(jobId)
- jobReq.setSub(userName)
- val jobHistoryList = jobDetailMapper.selectJobDetail(jobReq)
- if (jobHistoryList.isEmpty) null else jobHistoryList.get(0)
- }*/
-
- /*override def search(jobId: java.lang.Long, username: String, status: String, sDate: Date, eDate: Date): util.List[QueryJobDetail] = {
- import scala.collection.JavaConversions._
- val split: util.List[String] = if (status != null) status.split(",").toList else null
- jobDetailMapper.search(jobId, username, split, sDate, eDate, null)
- }*/
-
- /*override def getQueryVOList(list: java.util.List[QueryJobDetail]): java.util.List[JobRequest] = {
- jobHistory2JobRequest(list)
- }*/
-
private def shouldUpdate(oldStatus: String, newStatus: String): Boolean = TaskStatus.valueOf(oldStatus).ordinal <= TaskStatus.valueOf(newStatus).ordinal
}
diff --git a/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala b/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
index dcdb90ec5..aec8fac24 100644
--- a/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
+++ b/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
@@ -22,9 +22,11 @@ import com.google.common.collect.Iterables
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.governance.common.constant.job.JobRequestConstants
import org.apache.linkis.governance.common.entity.job.{JobRequest, JobRequestWithDetail, QueryException, SubJobDetail}
import org.apache.linkis.governance.common.protocol.job._
+import org.apache.linkis.jobhistory.conf.JobhistoryConfiguration
import org.apache.linkis.jobhistory.conversions.TaskConversions._
import org.apache.linkis.jobhistory.dao.{JobDetailMapper, JobHistoryMapper}
import org.apache.linkis.jobhistory.entity.{JobHistory, QueryJobHistory}
@@ -48,8 +50,6 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging {
@Autowired
private var jobHistoryMapper: JobHistoryMapper = _
- @Autowired
- private var jobDetailMapper: JobDetailMapper = _
private val unDoneTaskCache: Cache[String, Integer] = CacheBuilder.newBuilder().concurrencyLevel(5)
.expireAfterWrite(1, TimeUnit.MINUTES)
@@ -88,9 +88,9 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging {
val jobResp = new JobRespProtocol
Utils.tryCatch {
if (jobReq.getErrorDesc != null) {
- if (jobReq.getErrorDesc.length > 256) {
+ if (jobReq.getErrorDesc.length > GovernanceCommonConf.ERROR_CODE_DESC_LEN) {
logger.info(s"errorDesc is too long,we will cut some message")
- jobReq.setErrorDesc(jobReq.getErrorDesc.substring(0, 256))
+ jobReq.setErrorDesc(jobReq.getErrorDesc.substring(0, GovernanceCommonConf.ERROR_CODE_DESC_LEN))
logger.info(s"${jobReq.getErrorDesc}")
}
}
@@ -134,9 +134,9 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging {
val jobResp = new JobRespProtocol
Utils.tryCatch {
if (jobReq.getErrorDesc != null) {
- if (jobReq.getErrorDesc.length > 256) {
+ if (jobReq.getErrorDesc.length > GovernanceCommonConf.ERROR_CODE_DESC_LEN) {
logger.info(s"errorDesc is too long,we will cut some message")
- jobReq.setErrorDesc(jobReq.getErrorDesc.substring(0, 256))
+ jobReq.setErrorDesc(jobReq.getErrorDesc.substring(0, GovernanceCommonConf.ERROR_CODE_DESC_LEN))
logger.info(s"${jobReq.getErrorDesc}")
}
}
@@ -183,7 +183,9 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging {
val tasksWithDetails = new util.ArrayList[JobRequestWithDetail]
task.asScala.foreach(job => {
val subJobDetails = new util.ArrayList[SubJobDetail]()
- jobDetailMapper.selectJobDetailByJobHistoryId(job.getId).asScala.foreach(job => subJobDetails.add(jobdetail2SubjobDetail(job)))
+ val subJobDetail = new SubJobDetail
+ subJobDetail.setResultLocation(job.getResultLocation)
+ subJobDetails.add(subJobDetail)
tasksWithDetails.add(new JobRequestWithDetail(jobHistory2JobRequest(job)).setSubJobDetailList(subJobDetails))
})
val map = new util.HashMap[String, Object]()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org