You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/08/11 14:15:01 UTC
[incubator-linkis] branch dev-1.3.0 updated: Refactor exception in CS hook in spark plugin. (#2711)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.0 by this push:
new ea43a6de9 Refactor exception in CS hook in spark plugin. (#2711)
ea43a6de9 is described below
commit ea43a6de95c4cb090e8db22fc0e4e63eb7d91513
Author: Alexyang <xu...@qq.com>
AuthorDate: Thu Aug 11 22:14:57 2022 +0800
Refactor exception in CS hook in spark plugin. (#2711)
* #2431
linkis-engineplugin-spark - refactor SparkPreExecutionHook to raise ExecutorHookFatalException
---
.../core/exception/EngineConnErrorCode.scala | 1 +
.../core/exception/EngineConnFatalException.scala | 2 ++
linkis-dist/package/db/linkis_dml.sql | 1 +
.../spark/cs/CSSparkPreExecutionHook.scala | 17 ++++++++++-----
.../engineplugin/spark/cs/CSTableParser.scala | 3 +++
.../spark/executor/SparkEngineConnExecutor.scala | 25 +++++++++++++++++++++-
6 files changed, 43 insertions(+), 6 deletions(-)
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/exception/EngineConnErrorCode.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/exception/EngineConnErrorCode.scala
index 7bde0cabe..a31844e4d 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/exception/EngineConnErrorCode.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/exception/EngineConnErrorCode.scala
@@ -21,4 +21,5 @@ object EngineConnErrorCode {
val ENGINE_CONN_UN_INIT_CODE = 12100
+ val ENGINE_CONN_EXECUTOR_INIT_ERROR = 12101
}
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/exception/EngineConnFatalException.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/exception/EngineConnFatalException.scala
index 3582b2002..9b237dec7 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/exception/EngineConnFatalException.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/exception/EngineConnFatalException.scala
@@ -22,3 +22,5 @@ import org.apache.linkis.common.exception.FatalException
class EngineConnFatalException(errorCode: Int, msg: String) extends FatalException(errorCode, msg){
}
+
+case class ExecutorHookFatalException(errorCode: Int, msg: String) extends EngineConnFatalException(errorCode, msg)
\ No newline at end of file
diff --git a/linkis-dist/package/db/linkis_dml.sql b/linkis-dist/package/db/linkis_dml.sql
index 0aedac072..f2d3f8ffc 100644
--- a/linkis-dist/package/db/linkis_dml.sql
+++ b/linkis-dist/package/db/linkis_dml.sql
@@ -418,6 +418,7 @@ INSERT INTO linkis_ps_error_code (error_code,error_desc,error_regex,error_type)
INSERT INTO linkis_ps_error_code (error_code,error_desc,error_regex,error_type) VALUES ('43012','python save as table未指定格式,默认用parquet保存,hive查询报错','parquet.io.ParquetDecodingException',0);
INSERT INTO linkis_ps_error_code (error_code,error_desc,error_regex,error_type) VALUES ('43013','索引使用错误','IndexError',0);
INSERT INTO linkis_ps_error_code (error_code,error_desc,error_regex,error_type) VALUES ('43014','sql语法有问题','raise ParseException',0);
+INSERT INTO linkis_ps_error_code (error_code,error_desc,error_regex,error_type) VALUES ('43015','当前节点需要的CS表解析失败,请检查当前CSID对应的CS表是否存在','Cannot parse cs table for node',0);
-- 46 importExport
INSERT INTO linkis_ps_error_code (error_code,error_desc,error_regex,error_type) VALUES ('46001','找不到导入文件地址:%s','java.io.FileNotFoundException: (\\S+) \\(No such file or directory\\)',0);
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSSparkPreExecutionHook.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSSparkPreExecutionHook.scala
index fe07e321f..15ef38d61 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSSparkPreExecutionHook.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSSparkPreExecutionHook.scala
@@ -17,10 +17,12 @@
package org.apache.linkis.engineplugin.spark.cs
-import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.cs.client.utils.ContextServiceUtils
import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
+import org.apache.linkis.engineconn.core.exception.{EngineConnErrorCode, ExecutorHookFatalException}
import org.apache.linkis.engineplugin.spark.extension.SparkPreExecutionHook
+
import javax.annotation.PostConstruct
import org.springframework.stereotype.Component
@@ -43,12 +45,17 @@ class CSSparkPreExecutionHook extends SparkPreExecutionHook with Logging{
val contextIDValueStr = ContextServiceUtils.getContextIDStrByMap(engineExecutionContext.getProperties)
val nodeNameStr = ContextServiceUtils.getNodeNameStrByMap(engineExecutionContext.getProperties)
logger.info(s"Start to call CSSparkPreExecutionHook,contextID is $contextIDValueStr, nodeNameStr is $nodeNameStr")
- parsedCode = try {
+ parsedCode = Utils.tryCatch {
CSTableParser.parse(engineExecutionContext, parsedCode, contextIDValueStr, nodeNameStr)
- } catch {
+ } {
case t: Throwable =>
- logger.info("Failed to parser cs table", t)
- parsedCode
+ val msg = if (null != t) {
+ t.getMessage
+ } else {
+ "null message"
+ }
+ logger.info("Failed to parser cs table because : ", msg)
+ throw new ExecutorHookFatalException(EngineConnErrorCode.ENGINE_CONN_EXECUTOR_INIT_ERROR, s"Cannot parse cs table for node : ${nodeNameStr}.")
}
logger.info(s"Finished to call CSSparkPreExecutionHook,contextID is $contextIDValueStr, nodeNameStr is $nodeNameStr")
parsedCode
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSTableParser.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSTableParser.scala
index 29eb8f098..2789d528d 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSTableParser.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSTableParser.scala
@@ -59,6 +59,9 @@ object CSTableParser extends Logging {
*/
def parse(engineExecutorContext: EngineExecutionContext, code: String, contextIDValueStr: String, nodeNameStr: String): String = {
val csTempTables = getCSTempTable(code)
+ if (null == csTempTables || csTempTables.isEmpty) {
+ return code
+ }
val parsedTables = new ArrayBuffer[String]()
csTempTables.foreach{ csTempTable =>
val table = getCSTable(csTempTable, contextIDValueStr, nodeNameStr)
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
index 849412f92..60f894197 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
@@ -22,6 +22,7 @@ import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
import org.apache.linkis.engineconn.computation.executor.execute.{ComputationExecutor, EngineExecutionContext}
import org.apache.linkis.engineconn.computation.executor.utlis.ProgressUtils
+import org.apache.linkis.engineconn.core.exception.ExecutorHookFatalException
import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
import org.apache.linkis.engineplugin.spark.common.Kind
import org.apache.linkis.engineplugin.spark.cs.CSSparkHelper
@@ -78,7 +79,21 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) extends C
var preCode = code
engineExecutorContext.appendStdout(LogUtils.generateInfo(s"yarn application id: ${sc.applicationId}"))
//Pre-execution hook
- Utils.tryQuietly(SparkPreExecutionHook.getSparkPreExecutionHooks().foreach(hook => preCode = hook.callPreExecutionHook(engineExecutorContext, preCode)))
+ var executionHook: SparkPreExecutionHook = null
+ Utils.tryCatch {
+ SparkPreExecutionHook.getSparkPreExecutionHooks().foreach(hook => {
+ executionHook = hook
+ preCode = hook.callPreExecutionHook(engineExecutorContext, preCode)
+ })
+ } {
+ case fatalException: ExecutorHookFatalException =>
+ val hookName = getHookName(executionHook)
+ logger.error(s"execute preExecution hook : ${hookName} failed.")
+ throw fatalException
+ case e: Exception =>
+ val hookName = getHookName(executionHook)
+ logger.error(s"execute preExecution hook : ${hookName} failed.")
+ }
Utils.tryAndWarn(CSSparkHelper.setContextIDInfoToSparkConf(engineExecutorContext, sc))
val _code = Kind.getRealCode(preCode)
logger.info(s"Ready to run code with kind $kind.")
@@ -105,6 +120,14 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) extends C
this.engineExecutionContext = null
}
+ private def getHookName(executeHook: SparkPreExecutionHook): String = {
+ if (null == executeHook) {
+ "empty hook"
+ } else {
+ executeHook.getClass.getName
+ }
+ }
+
override def executeCompletely(engineExecutorContext: EngineExecutionContext, code: String, completedLine: String): ExecuteResponse = {
val newcode = completedLine + code
logger.info("newcode is " + newcode)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org