You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/08/11 14:15:01 UTC

[incubator-linkis] branch dev-1.3.0 updated: Refactor exception in CS hook in spark plugin. (#2711)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.0 by this push:
     new ea43a6de9 Refactor exception in CS hook in spark plugin. (#2711)
ea43a6de9 is described below

commit ea43a6de95c4cb090e8db22fc0e4e63eb7d91513
Author: Alexyang <xu...@qq.com>
AuthorDate: Thu Aug 11 22:14:57 2022 +0800

    Refactor exception in CS hook in spark plugin. (#2711)
    
    * #2431
    linkis-engineplugin-spark - refactor SparkPreExecutionHook to raise ExecutorHookFatalException
---
 .../core/exception/EngineConnErrorCode.scala       |  1 +
 .../core/exception/EngineConnFatalException.scala  |  2 ++
 linkis-dist/package/db/linkis_dml.sql              |  1 +
 .../spark/cs/CSSparkPreExecutionHook.scala         | 17 ++++++++++-----
 .../engineplugin/spark/cs/CSTableParser.scala      |  3 +++
 .../spark/executor/SparkEngineConnExecutor.scala   | 25 +++++++++++++++++++++-
 6 files changed, 43 insertions(+), 6 deletions(-)

diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/exception/EngineConnErrorCode.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/exception/EngineConnErrorCode.scala
index 7bde0cabe..a31844e4d 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/exception/EngineConnErrorCode.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/exception/EngineConnErrorCode.scala
@@ -21,4 +21,5 @@ object EngineConnErrorCode {
 
   val ENGINE_CONN_UN_INIT_CODE = 12100
 
+  val ENGINE_CONN_EXECUTOR_INIT_ERROR = 12101
 }
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/exception/EngineConnFatalException.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/exception/EngineConnFatalException.scala
index 3582b2002..9b237dec7 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/exception/EngineConnFatalException.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/exception/EngineConnFatalException.scala
@@ -22,3 +22,5 @@ import org.apache.linkis.common.exception.FatalException
 class EngineConnFatalException(errorCode: Int, msg: String) extends FatalException(errorCode, msg){
 
 }
+
+case class ExecutorHookFatalException(errorCode: Int, msg: String) extends EngineConnFatalException(errorCode, msg)
\ No newline at end of file
diff --git a/linkis-dist/package/db/linkis_dml.sql b/linkis-dist/package/db/linkis_dml.sql
index 0aedac072..f2d3f8ffc 100644
--- a/linkis-dist/package/db/linkis_dml.sql
+++ b/linkis-dist/package/db/linkis_dml.sql
@@ -418,6 +418,7 @@ INSERT INTO linkis_ps_error_code (error_code,error_desc,error_regex,error_type)
 INSERT INTO linkis_ps_error_code (error_code,error_desc,error_regex,error_type) VALUES ('43012','python save as table未指定格式,默认用parquet保存,hive查询报错','parquet.io.ParquetDecodingException',0);
 INSERT INTO linkis_ps_error_code (error_code,error_desc,error_regex,error_type) VALUES ('43013','索引使用错误','IndexError',0);
 INSERT INTO linkis_ps_error_code (error_code,error_desc,error_regex,error_type) VALUES ('43014','sql语法有问题','raise ParseException',0);
+INSERT INTO linkis_ps_error_code (error_code,error_desc,error_regex,error_type) VALUES ('43015','当前节点需要的CS表解析失败,请检查当前CSID对应的CS表是否存在','Cannot parse cs table for node',0);
 
 -- 46 importExport
 INSERT INTO linkis_ps_error_code (error_code,error_desc,error_regex,error_type) VALUES ('46001','找不到导入文件地址:%s','java.io.FileNotFoundException: (\\S+) \\(No such file or directory\\)',0);
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSSparkPreExecutionHook.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSSparkPreExecutionHook.scala
index fe07e321f..15ef38d61 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSSparkPreExecutionHook.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSSparkPreExecutionHook.scala
@@ -17,10 +17,12 @@
  
 package org.apache.linkis.engineplugin.spark.cs
 
-import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.cs.client.utils.ContextServiceUtils
 import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
+import org.apache.linkis.engineconn.core.exception.{EngineConnErrorCode, ExecutorHookFatalException}
 import org.apache.linkis.engineplugin.spark.extension.SparkPreExecutionHook
+
 import javax.annotation.PostConstruct
 import org.springframework.stereotype.Component
 
@@ -43,12 +45,17 @@ class CSSparkPreExecutionHook extends SparkPreExecutionHook with Logging{
     val contextIDValueStr = ContextServiceUtils.getContextIDStrByMap(engineExecutionContext.getProperties)
     val nodeNameStr = ContextServiceUtils.getNodeNameStrByMap(engineExecutionContext.getProperties)
     logger.info(s"Start to call CSSparkPreExecutionHook,contextID is $contextIDValueStr, nodeNameStr is $nodeNameStr")
-    parsedCode = try {
+    parsedCode = Utils.tryCatch {
       CSTableParser.parse(engineExecutionContext, parsedCode, contextIDValueStr, nodeNameStr)
-    } catch {
+    } {
       case t: Throwable =>
-        logger.info("Failed to parser cs table", t)
-        parsedCode
+        val msg = if (null != t) {
+          t.getMessage
+        } else {
+          "null message"
+        }
+        logger.info("Failed to parser cs table because : ", msg)
+        throw new ExecutorHookFatalException(EngineConnErrorCode.ENGINE_CONN_EXECUTOR_INIT_ERROR, s"Cannot parse cs table for node : ${nodeNameStr}.")
     }
     logger.info(s"Finished to call CSSparkPreExecutionHook,contextID is $contextIDValueStr, nodeNameStr is $nodeNameStr")
     parsedCode
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSTableParser.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSTableParser.scala
index 29eb8f098..2789d528d 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSTableParser.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSTableParser.scala
@@ -59,6 +59,9 @@ object CSTableParser extends Logging {
     */
   def parse(engineExecutorContext: EngineExecutionContext, code: String, contextIDValueStr: String, nodeNameStr: String): String = {
     val csTempTables = getCSTempTable(code)
+    if (null == csTempTables || csTempTables.isEmpty) {
+      return code
+    }
     val parsedTables = new ArrayBuffer[String]()
     csTempTables.foreach{ csTempTable =>
       val table = getCSTable(csTempTable, contextIDValueStr, nodeNameStr)
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
index 849412f92..60f894197 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
@@ -22,6 +22,7 @@ import org.apache.linkis.common.log.LogUtils
 import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
 import org.apache.linkis.engineconn.computation.executor.execute.{ComputationExecutor, EngineExecutionContext}
 import org.apache.linkis.engineconn.computation.executor.utlis.ProgressUtils
+import org.apache.linkis.engineconn.core.exception.ExecutorHookFatalException
 import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
 import org.apache.linkis.engineplugin.spark.common.Kind
 import org.apache.linkis.engineplugin.spark.cs.CSSparkHelper
@@ -78,7 +79,21 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) extends C
     var preCode = code
     engineExecutorContext.appendStdout(LogUtils.generateInfo(s"yarn application id: ${sc.applicationId}"))
     //Pre-execution hook
-    Utils.tryQuietly(SparkPreExecutionHook.getSparkPreExecutionHooks().foreach(hook => preCode = hook.callPreExecutionHook(engineExecutorContext, preCode)))
+    var executionHook: SparkPreExecutionHook = null
+    Utils.tryCatch {
+      SparkPreExecutionHook.getSparkPreExecutionHooks().foreach(hook => {
+        executionHook = hook
+        preCode = hook.callPreExecutionHook(engineExecutorContext, preCode)
+      })
+    } {
+      case fatalException: ExecutorHookFatalException =>
+        val hookName = getHookName(executionHook)
+        logger.error(s"execute preExecution hook : ${hookName} failed.")
+        throw fatalException
+      case e: Exception =>
+        val hookName = getHookName(executionHook)
+        logger.error(s"execute preExecution hook : ${hookName} failed.")
+    }
     Utils.tryAndWarn(CSSparkHelper.setContextIDInfoToSparkConf(engineExecutorContext, sc))
     val _code = Kind.getRealCode(preCode)
     logger.info(s"Ready to run code with kind $kind.")
@@ -105,6 +120,14 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) extends C
     this.engineExecutionContext = null
   }
 
+  private def getHookName(executeHook: SparkPreExecutionHook): String = {
+    if (null == executeHook) {
+      "empty hook"
+    } else {
+      executeHook.getClass.getName
+    }
+  }
+
   override def executeCompletely(engineExecutorContext: EngineExecutionContext, code: String, completedLine: String): ExecuteResponse = {
     val newcode = completedLine + code
     logger.info("newcode is " + newcode)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org