You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/04/13 05:13:05 UTC

[incubator-linkis] branch dev-1.1.2 updated: EC supports add taskIDs into the underlying engine's conf (#1942)

This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new 8df1bbac1 EC supports add taskIDs into the underlying engine's conf  (#1942)
8df1bbac1 is described below

commit 8df1bbac1ac6502ca2c010a6000f910a7b8902d4
Author: peacewong <wp...@gmail.com>
AuthorDate: Wed Apr 13 13:13:00 2022 +0800

    EC supports add taskIDs into the underlying engine's conf  (#1942)
    
    * Completed and shutdown add release #1940
    
    * EC supports stuffing taskIDs into the underlying engine's conf #1863
---
 .../computation/executor/conf/ComputationExecutorConf.scala   |  2 ++
 .../executor/service/TaskExecutionServiceImpl.scala           |  5 ++++-
 .../executor/execution/AccessibleEngineConnExecution.scala    |  2 ++
 .../engineplugin/hive/executor/HiveEngineConnExecutor.scala   |  8 +++++++-
 .../engineplugin/spark/executor/SparkEngineConnExecutor.scala | 11 ++++++++++-
 5 files changed, 25 insertions(+), 3 deletions(-)

diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala
index 610569a6b..a1ebcf1e0 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala
@@ -53,4 +53,6 @@ object ComputationExecutorConf {
   val UPSTREAM_MONITOR_ECTASK_ENTRANCE_THRESHOLD_SEC =  CommonVars("linkis.upstream.monitor.ectask.entrance.threshold.sec", 15).getValue
 
   val HIVE_RESULTSET_USE_TABLE_NAME = CommonVars("hive.resultset.use.unique.column.names", false)
+
+  val JOB_ID_TO_ENV_KEY = CommonVars("wds.linkis.ec.job.id.env.key", "LINKIS_JOB_ID").getValue
 }
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
index 6d5ddb4c5..dfee37c35 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
@@ -132,7 +132,10 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re
       else false
     }
     val jobId = JobUtils.getJobIdFromMap(requestTask.getProperties)
-    logger.info(s"Received job with id ${jobId}.")
+    if (StringUtils.isNotBlank(jobId)) {
+      System.getenv().put(ComputationExecutorConf.JOB_ID_TO_ENV_KEY, jobId)
+      logger.info(s"Received job with id ${jobId}.")
+    }
     val task = new CommonEngineConnTask(String.valueOf(taskId), retryAble)
     task.setCode(requestTask.getCode)
     task.setProperties(requestTask.getProperties)
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
index 057088cff..26943e2ca 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
@@ -85,9 +85,11 @@ class AccessibleEngineConnExecution extends EngineConnExecution with Logging {
         }
         if (NodeStatus.isCompleted(accessibleExecutor.getStatus)) {
           error(s"${accessibleExecutor.getId} has completed with status ${accessibleExecutor.getStatus}, now stop it.")
+          requestManagerReleaseExecutor("Completed release")
           ShutdownHook.getShutdownHook.notifyStop()
         } else if (accessibleExecutor.getStatus == NodeStatus.ShuttingDown) {
           logger.warn(s"${accessibleExecutor.getId} is ShuttingDown...")
+          requestManagerReleaseExecutor(" ShuttingDown release")
           ShutdownHook.getShutdownHook.notifyStop()
         } else if (maxFreeTime > 0 && (NodeStatus.Unlock.equals(accessibleExecutor.getStatus) || NodeStatus.Idle.equals(accessibleExecutor.getStatus))
           && System.currentTimeMillis - accessibleExecutor.getLastActivityTime > maxFreeTime) {
diff --git a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
index 6edd9f3e7..df2840942 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
@@ -46,15 +46,16 @@ import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.mapred.{JobStatus, RunningJob}
 import org.apache.hadoop.security.UserGroupInformation
 import org.slf4j.LoggerFactory
+
 import java.io.ByteArrayOutputStream
 import java.security.PrivilegedExceptionAction
 import java.util
 import java.util.concurrent.atomic.AtomicBoolean
-
 import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
 import org.apache.linkis.engineplugin.hive.conf.Counters
 import org.apache.linkis.manager.common.protocol.resource.ResourceWithStatus
 import org.apache.commons.lang.StringUtils
+import org.apache.linkis.governance.common.utils.JobUtils
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -120,6 +121,11 @@ class HiveEngineConnExecutor(id: Int,
     currentSqlProgress = 0.0f
     val realCode = code.trim()
     LOG.info(s"hive client begins to run hql code:\n ${realCode.trim}")
+    val jobId = JobUtils.getJobIdFromMap(engineExecutorContext.getProperties)
+    if (StringUtils.isNotBlank(jobId)) {
+      LOG.info(s"set mapred.job.name=LINKIS_$jobId")
+      hiveConf.set("mapred.job.name", s"LINKIS_$jobId")
+    }
     if (realCode.trim.length > 500) {
       engineExecutorContext.appendStdout(s"$getId >> ${realCode.trim.substring(0, 500)} ...")
     } else engineExecutorContext.appendStdout(s"$getId >> ${realCode.trim}")
diff --git a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
index 7ec3f0d01..88f1f2b01 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
@@ -17,6 +17,8 @@
  
 package org.apache.linkis.engineplugin.spark.executor
 
+import org.apache.commons.lang3.StringUtils
+
 import java.util
 import java.util.concurrent.atomic.AtomicLong
 import org.apache.linkis.common.log.LogUtils
@@ -28,6 +30,7 @@ import org.apache.linkis.engineplugin.spark.cs.CSSparkHelper
 import org.apache.linkis.engineplugin.spark.extension.{SparkPostExecutionHook, SparkPreExecutionHook}
 import org.apache.linkis.engineplugin.spark.utils.JobProgressUtil
 import org.apache.linkis.governance.common.exception.LinkisJobRetryException
+import org.apache.linkis.governance.common.utils.JobUtils
 import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
 import org.apache.linkis.manager.common.entity.resource._
 import org.apache.linkis.manager.common.protocol.resource.ResourceWithStatus
@@ -79,7 +82,13 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) extends C
     Utils.tryAndWarn(CSSparkHelper.setContextIDInfoToSparkConf(engineExecutorContext, sc))
     val _code = Kind.getRealCode(preCode)
     info(s"Ready to run code with kind $kind.")
-    jobGroup = String.valueOf("linkis-spark-mix-code-" + queryNum.incrementAndGet())
+    val jobId = JobUtils.getJobIdFromMap(engineExecutorContext.getProperties)
+    val jobGroupId = if (StringUtils.isNotBlank(jobId)) {
+      jobId
+    } else {
+      queryNum.incrementAndGet()
+    }
+    jobGroup = String.valueOf("linkis-spark-mix-code-" + jobGroupId)
     //    val executeCount = queryNum.get().toInt - 1
     info("Set jobGroup to " + jobGroup)
     sc.setJobGroup(jobGroup, _code, true)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org