You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/04/13 05:13:05 UTC
[incubator-linkis] branch dev-1.1.2 updated: EC supports add taskIDs into the underlying engine's conf (#1942)
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
new 8df1bbac1 EC supports add taskIDs into the underlying engine's conf (#1942)
8df1bbac1 is described below
commit 8df1bbac1ac6502ca2c010a6000f910a7b8902d4
Author: peacewong <wp...@gmail.com>
AuthorDate: Wed Apr 13 13:13:00 2022 +0800
EC supports add taskIDs into the underlying engine's conf (#1942)
* Completed and shutdown add release #1940
* EC supports stuffing taskIDs into the underlying engine's conf #1863
---
.../computation/executor/conf/ComputationExecutorConf.scala | 2 ++
.../executor/service/TaskExecutionServiceImpl.scala | 5 ++++-
.../executor/execution/AccessibleEngineConnExecution.scala | 2 ++
.../engineplugin/hive/executor/HiveEngineConnExecutor.scala | 8 +++++++-
.../engineplugin/spark/executor/SparkEngineConnExecutor.scala | 11 ++++++++++-
5 files changed, 25 insertions(+), 3 deletions(-)
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala
index 610569a6b..a1ebcf1e0 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala
@@ -53,4 +53,6 @@ object ComputationExecutorConf {
val UPSTREAM_MONITOR_ECTASK_ENTRANCE_THRESHOLD_SEC = CommonVars("linkis.upstream.monitor.ectask.entrance.threshold.sec", 15).getValue
val HIVE_RESULTSET_USE_TABLE_NAME = CommonVars("hive.resultset.use.unique.column.names", false)
+
+ val JOB_ID_TO_ENV_KEY = CommonVars("wds.linkis.ec.job.id.env.key", "LINKIS_JOB_ID").getValue
}
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
index 6d5ddb4c5..dfee37c35 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
@@ -132,7 +132,10 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re
else false
}
val jobId = JobUtils.getJobIdFromMap(requestTask.getProperties)
- logger.info(s"Received job with id ${jobId}.")
+ if (StringUtils.isNotBlank(jobId)) {
+ System.getenv().put(ComputationExecutorConf.JOB_ID_TO_ENV_KEY, jobId)
+ logger.info(s"Received job with id ${jobId}.")
+ }
val task = new CommonEngineConnTask(String.valueOf(taskId), retryAble)
task.setCode(requestTask.getCode)
task.setProperties(requestTask.getProperties)
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
index 057088cff..26943e2ca 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
@@ -85,9 +85,11 @@ class AccessibleEngineConnExecution extends EngineConnExecution with Logging {
}
if (NodeStatus.isCompleted(accessibleExecutor.getStatus)) {
error(s"${accessibleExecutor.getId} has completed with status ${accessibleExecutor.getStatus}, now stop it.")
+ requestManagerReleaseExecutor("Completed release")
ShutdownHook.getShutdownHook.notifyStop()
} else if (accessibleExecutor.getStatus == NodeStatus.ShuttingDown) {
logger.warn(s"${accessibleExecutor.getId} is ShuttingDown...")
+ requestManagerReleaseExecutor(" ShuttingDown release")
ShutdownHook.getShutdownHook.notifyStop()
} else if (maxFreeTime > 0 && (NodeStatus.Unlock.equals(accessibleExecutor.getStatus) || NodeStatus.Idle.equals(accessibleExecutor.getStatus))
&& System.currentTimeMillis - accessibleExecutor.getLastActivityTime > maxFreeTime) {
diff --git a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
index 6edd9f3e7..df2840942 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
@@ -46,15 +46,16 @@ import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.mapred.{JobStatus, RunningJob}
import org.apache.hadoop.security.UserGroupInformation
import org.slf4j.LoggerFactory
+
import java.io.ByteArrayOutputStream
import java.security.PrivilegedExceptionAction
import java.util
import java.util.concurrent.atomic.AtomicBoolean
-
import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
import org.apache.linkis.engineplugin.hive.conf.Counters
import org.apache.linkis.manager.common.protocol.resource.ResourceWithStatus
import org.apache.commons.lang.StringUtils
+import org.apache.linkis.governance.common.utils.JobUtils
import scala.collection.JavaConversions._
import scala.collection.mutable
@@ -120,6 +121,11 @@ class HiveEngineConnExecutor(id: Int,
currentSqlProgress = 0.0f
val realCode = code.trim()
LOG.info(s"hive client begins to run hql code:\n ${realCode.trim}")
+ val jobId = JobUtils.getJobIdFromMap(engineExecutorContext.getProperties)
+ if (StringUtils.isNotBlank(jobId)) {
+ LOG.info(s"set mapred.job.name=LINKIS_$jobId")
+ hiveConf.set("mapred.job.name", s"LINKIS_$jobId")
+ }
if (realCode.trim.length > 500) {
engineExecutorContext.appendStdout(s"$getId >> ${realCode.trim.substring(0, 500)} ...")
} else engineExecutorContext.appendStdout(s"$getId >> ${realCode.trim}")
diff --git a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
index 7ec3f0d01..88f1f2b01 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
@@ -17,6 +17,8 @@
package org.apache.linkis.engineplugin.spark.executor
+import org.apache.commons.lang3.StringUtils
+
import java.util
import java.util.concurrent.atomic.AtomicLong
import org.apache.linkis.common.log.LogUtils
@@ -28,6 +30,7 @@ import org.apache.linkis.engineplugin.spark.cs.CSSparkHelper
import org.apache.linkis.engineplugin.spark.extension.{SparkPostExecutionHook, SparkPreExecutionHook}
import org.apache.linkis.engineplugin.spark.utils.JobProgressUtil
import org.apache.linkis.governance.common.exception.LinkisJobRetryException
+import org.apache.linkis.governance.common.utils.JobUtils
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.manager.common.entity.resource._
import org.apache.linkis.manager.common.protocol.resource.ResourceWithStatus
@@ -79,7 +82,13 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) extends C
Utils.tryAndWarn(CSSparkHelper.setContextIDInfoToSparkConf(engineExecutorContext, sc))
val _code = Kind.getRealCode(preCode)
info(s"Ready to run code with kind $kind.")
- jobGroup = String.valueOf("linkis-spark-mix-code-" + queryNum.incrementAndGet())
+ val jobId = JobUtils.getJobIdFromMap(engineExecutorContext.getProperties)
+ val jobGroupId = if (StringUtils.isNotBlank(jobId)) {
+ jobId
+ } else {
+ queryNum.incrementAndGet()
+ }
+ jobGroup = String.valueOf("linkis-spark-mix-code-" + jobGroupId)
// val executeCount = queryNum.get().toInt - 1
info("Set jobGroup to " + jobGroup)
sc.setJobGroup(jobGroup, _code, true)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org