You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/06/24 09:10:00 UTC
[incubator-linkis] branch dev-1.2.0 updated: support cancel hive on tez task and engineconn is no longer busy
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.2.0 by this push:
new 9e4b6a7b9 support cancel hive on tez task and engineconn is no longer busy
9e4b6a7b9 is described below
commit 9e4b6a7b94922f0c8c0486c7e0dc88727daecbfd
Author: 张霖 <ba...@live.com>
AuthorDate: Fri Jun 24 17:09:53 2022 +0800
support cancel hive on tez task and engineconn is no longer busy
* fix(linkis-engineplugin-hive): support cancel hive on tez task and engineconn is no longer busy
Closes #2321
---
.../src/main/resources/linkis-engineconn.properties | 5 ++++-
.../hive/conf/HiveEngineConfiguration.scala | 1 +
.../hive/executor/HiveEngineConnExecutor.scala | 20 ++++++++++++++------
3 files changed, 19 insertions(+), 7 deletions(-)
diff --git a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
index 4ad1d6700..a2135181f 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
+++ b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
@@ -21,4 +21,7 @@ wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.hive.H
wds.linkis.bdp.hive.init.sql.enable=true
wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineplugin.hive.hook.HiveAddJarsEngineHook,org.apache.linkis.engineconn.computation.executor.hook.JarUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.HiveUseDatabaseEngineHook,org.apache.linkis.engineconn.computation.executor.hook.HiveInitSQLHook
-wds.linkis.engineconn.maintain.enable=true
\ No newline at end of file
+wds.linkis.engineconn.maintain.enable=true
+
+#Depending on the engine selected in HIVE_ENGINE_TYPE, control the function called when canceling the task in scripts.
+wds.linkis.hive.engine.type=mr
\ No newline at end of file
diff --git a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala
index 4db521dfa..ec614291d 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala
@@ -25,4 +25,5 @@ object HiveEngineConfiguration {
val ENABLE_FETCH_BASE64 = CommonVars[Boolean]("wds.linkis.hive.enable.fetch.base64",false).getValue
val BASE64_SERDE_CLASS = CommonVars[String]("wds.linkis.hive.base64.serde.class","org.apache.linkis.engineplugin.hive.serde.CustomerDelimitedJSONSerDe").getValue
val HIVE_AUX_JARS_PATH = CommonVars[String]("hive.aux.jars.path", CommonVars[String]("HIVE_AUX_JARS_PATH", "").getValue).getValue
+ val HIVE_ENGINE_TYPE = CommonVars[String]("wds.linkis.hive.engine.type", "mr").getValue
}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
index f7af9cc24..d20cad578 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
@@ -52,9 +52,10 @@ import java.security.PrivilegedExceptionAction
import java.util
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
-import org.apache.linkis.engineplugin.hive.conf.Counters
+import org.apache.linkis.engineplugin.hive.conf.{Counters, HiveEngineConfiguration}
import org.apache.linkis.manager.common.protocol.resource.ResourceWithStatus
import org.apache.commons.lang.StringUtils
+import org.apache.hadoop.hive.ql.exec.tez.TezJobExecHelper
import org.apache.linkis.engineconn.computation.executor.utlis.ProgressUtils
import org.apache.linkis.governance.common.utils.JobUtils
@@ -446,11 +447,18 @@ class HiveEngineConnExecutor(id: Int,
override def killTask(taskID: String): Unit = {
LOG.info(s"hive begins to kill job with id : ${taskID}")
- HadoopJobExecHelper.killRunningJobs()
- //Utils.tryQuietly(TezJobExecHelper.killRunningJobs())
- Utils.tryQuietly(HiveInterruptUtils.interrupt())
- if (null != thread) {
- Utils.tryAndWarn(thread.interrupt())
+ //Configure the engine through the wds.linkis.hive.engine.type parameter to control the way the task is killed
+ LOG.info(s"hive engine type :${HiveEngineConfiguration.HIVE_ENGINE_TYPE}")
+ HiveEngineConfiguration.HIVE_ENGINE_TYPE match {
+ case "mr" =>
+ HadoopJobExecHelper.killRunningJobs()
+ Utils.tryQuietly(HiveInterruptUtils.interrupt())
+ if (null != thread) {
+ Utils.tryAndWarn(thread.interrupt())
+ }
+ case "tez" =>
+ Utils.tryQuietly(TezJobExecHelper.killRunningJobs())
+ driver.close()
}
clearCurrentProgress()
singleSqlProgressMap.clear()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org