You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/09/08 12:13:38 UTC

[incubator-linkis] branch dev-1.3.1 updated: [linkis-engineplugin-spark] Modification of scala file floating red (#3194)

This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
     new 5cf593eaa [linkis-engineplugin-spark] Modification of scala file floating red (#3194)
5cf593eaa is described below

commit 5cf593eaa01654655316870ed8ad31cc9c5e81f5
Author: 成彬彬 <10...@users.noreply.github.com>
AuthorDate: Thu Sep 8 20:13:34 2022 +0800

    [linkis-engineplugin-spark] Modification of scala file floating red (#3194)
---
 .../spark/Interpreter/Interpreter.scala            |  2 +-
 .../spark/Interpreter/ProcessInterpreter.scala     |  4 +-
 .../spark/Interpreter/PythonInterpreter.scala      |  8 ++--
 .../args/SparkPythonArgsPreExecutionHook.scala     |  4 +-
 .../engineplugin/spark/common/LogContainer.scala   |  6 +--
 .../engineplugin/spark/common/SparkKind.scala      |  4 +-
 .../engineplugin/spark/cs/CSTableParser.scala      |  1 +
 .../engineplugin/spark/executor/SQLSession.scala   | 11 ++++--
 .../spark/executor/SparkEngineConnExecutor.scala   | 42 ++++++++------------
 .../spark/executor/SparkPythonExecutor.scala       | 45 ++++++++--------------
 .../spark/executor/SparkScalaExecutor.scala        |  3 +-
 .../spark/factory/SparkEngineConnFactory.scala     | 14 +++----
 .../engineplugin/spark/imexport/CsvRelation.scala  | 19 ++++-----
 .../engineplugin/spark/imexport/ExportData.scala   |  6 +--
 .../engineplugin/spark/imexport/LoadData.scala     | 31 +++++++++------
 .../spark/mdq/MDQPostExecutionHook.scala           |  6 ++-
 .../spark/mdq/MDQPreExecutionHook.scala            |  8 +++-
 .../engineplugin/spark/utils/EngineUtils.scala     |  2 +-
 .../spark/sql/execution/datasources/csv/UDF.scala  |  1 +
 19 files changed, 106 insertions(+), 111 deletions(-)

diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/Interpreter.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/Interpreter.scala
index 7dd4ff6db..f6d6c797e 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/Interpreter.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/Interpreter.scala
@@ -36,7 +36,7 @@ trait Interpreter {
 
   @throws(classOf[TimeoutException])
   @throws(classOf[InterruptedException])
-  final def waitForStateChange(oldState: State, atMost: Duration) = {
+  final def waitForStateChange(oldState: State, atMost: Duration): Unit = {
     Utils.waitUntil({ () => state != oldState }, atMost)
   }
 
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/ProcessInterpreter.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/ProcessInterpreter.scala
index f703a3b6d..171d48e4f 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/ProcessInterpreter.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/ProcessInterpreter.scala
@@ -87,6 +87,7 @@ abstract class ProcessInterpreter(process: Process) extends Interpreter with Log
   Future {
     val exitCode = process.waitFor()
     if (exitCode != 0) {
+      // scalastyle:off println
       errOut.lines.foreach(println)
       println(getClass.getSimpleName + " has stopped with exit code " + process.exitValue)
       _state = Error()
@@ -115,8 +116,7 @@ abstract class ProcessInterpreter(process: Process) extends Interpreter with Log
     IOUtils.closeQuietly(stdin)
     IOUtils.closeQuietly(stdout)
     errOut.close
-
-    // Give ourselves 10 seconds to tear down the process.
+    // scalastyle:off awaitresult
     Utils.tryFinally(Await.result(future, Duration(10, TimeUnit.SECONDS))) {
       process.destroy()
     }
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala
index a7824b7ff..f6f5a7d67 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala
@@ -66,7 +66,7 @@ object PythonInterpreter {
     new PythonInterpreter(process, gatewayServer)
   }
 
-  def pythonPath = {
+  def pythonPath: String = {
     val pythonPath = new ArrayBuffer[String]
     val pythonHomePath = new File(SparkConfiguration.SPARK_HOME.getValue, "python").getPath
     val pythonParentPath = new File(pythonHomePath, "lib")
@@ -156,6 +156,7 @@ private class PythonInterpreter(process: Process, gatewayServer: GatewayServer)
     val iterable = initOut.iterator
     while (continue && iterable.hasNext) {
       iterable.next match {
+        // scalastyle:off println
         case "READY" => println("Start python application succeed."); continue = false
         case str: String => println(str)
         case _ =>
@@ -182,16 +183,15 @@ private class PythonInterpreter(process: Process, gatewayServer: GatewayServer)
   }
 
   private def sendRequest(request: Map[String, Any]): Option[JValue] = {
+    // scalastyle:off println
     stdin.println(Serialization.write(request))
     stdin.flush()
 
     Option(stdout.readLine()).map { line => parse(line) }
   }
 
-  def pythonPath = {
+  def pythonPath: String = {
     val pythonPath = new ArrayBuffer[String]
-    //    sys.env.get("SPARK_HOME").foreach { sparkHome =>
-    //    }
     val pythonHomePath = new File(SparkConfiguration.SPARK_HOME.getValue, "python").getPath
     val pythonParentPath = new File(pythonHomePath, "lib")
     pythonPath += pythonHomePath
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/args/SparkPythonArgsPreExecutionHook.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/args/SparkPythonArgsPreExecutionHook.scala
index 6313b9885..fd4a9723b 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/args/SparkPythonArgsPreExecutionHook.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/args/SparkPythonArgsPreExecutionHook.scala
@@ -64,7 +64,9 @@ class SparkPythonArgsPreExecutionHook extends SparkPreExecutionHook with Logging
         engineExecutionContext.getTotalParagraph != 1 || StringUtils.isEmpty(
           runType
         ) || !StringUtils.equals(RunType.PYSPARK.toString, runType)
-    ) return code
+    ) {
+      return code
+    }
 
     val argsArr =
       if (
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/LogContainer.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/LogContainer.scala
index e476b4edf..698e52a3b 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/LogContainer.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/LogContainer.scala
@@ -39,11 +39,11 @@ class LogContainer(val logSize: Int) {
     }
   }
 
-  def putLogs(logs: Iterable[String]) = synchronized {
+  def putLogs(logs: Iterable[String]): Unit = synchronized {
     logs.foreach(putLog)
   }
 
-  def reset() = synchronized {
+  def reset(): Unit = synchronized {
     flag = 0
     tail = 0
   }
@@ -64,7 +64,7 @@ class LogContainer(val logSize: Int) {
     }
   }
 
-  def size = {
+  def size: Int = {
     if (flag == tail) 0
     else if (flag > tail) tail + logSize - flag
     else tail - flag
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala
index 67fcd9c7e..6ba8b210c 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala
@@ -44,12 +44,12 @@ object SparkKind {
     getKind(Kind.getKind(code))
   }
 
-  def getSessionKind(code: String) = {
+  def getSessionKind(code: String): Kind with Product with Serializable = {
     val kindStr = Kind.getKindString(code)
     if (kindStr.indexOf("@") == 0) getKind(kindStr.substring(1)) else SparkMix()
   }
 
-  private def getKind(kindStr: String) = {
+  private def getKind(kindStr: String): Kind with Product with Serializable = {
     kindStr match {
       case SPARKSCALA_TYPE | SCALA_LAN => SparkScala()
       case PYSPARK_TYPE | PYTHON_LAN | PYTHON_END => PySpark()
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSTableParser.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSTableParser.scala
index 4a8127164..a862a45b3 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSTableParser.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSTableParser.scala
@@ -71,6 +71,7 @@ object CSTableParser extends Logging {
     csTempTables.foreach { csTempTable =>
       val table = getCSTable(csTempTable, contextIDValueStr, nodeNameStr)
       if (null == table) {
+        // scalastyle:off throwerror
         throw new ExecuteError(40007, s"The csTable that name is $csTempTable not found in cs")
       }
       registerTempTable(table)
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SQLSession.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SQLSession.scala
index d259d8824..1201665c5 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SQLSession.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SQLSession.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.types.{StructField, StructType}
 
 import java.text.NumberFormat
+import java.util.Locale
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -97,16 +98,20 @@ object SQLSession extends Logging {
     // val columnsSet = dataFrame.schema
     val columns = columnsSet
       .map(c =>
-        Column(c.name, DataType.toDataType(c.dataType.typeName.toLowerCase), c.getComment().orNull)
+        Column(
+          c.name,
+          DataType.toDataType(c.dataType.typeName.toLowerCase(Locale.getDefault())),
+          c.getComment().orNull
+        )
       )
       .toArray[Column]
     columns.foreach(c => logger.info(s"c is ${c.columnName}, comment is ${c.comment}"))
     if (columns == null || columns.isEmpty) return
     val metaData = new TableMetaData(columns)
     val writer =
-      if (StringUtils.isNotBlank(alias))
+      if (StringUtils.isNotBlank(alias)) {
         engineExecutionContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE, alias)
-      else engineExecutionContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
+      } else engineExecutionContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
     writer.addMetaData(metaData)
     var index = 0
     Utils.tryThrow({
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
index 19a7d546f..35cf6801a 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
@@ -159,25 +159,25 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long)
     executeLine(engineExecutorContext, newcode)
   }
 
-  override def progress(taskID: String): Float = if (
-      jobGroup == null || engineExecutionContext.getTotalParagraph == 0
-  ) ProgressUtils.getOldProgress(this.engineExecutionContext)
-  else {
-    val newProgress =
-      (engineExecutionContext.getCurrentParagraph * 1f - 1f) / engineExecutionContext.getTotalParagraph + JobProgressUtil
-        .progress(sc, jobGroup) / engineExecutionContext.getTotalParagraph
-    val normalizedProgress = if (newProgress >= 1) newProgress - 0.1f else newProgress
-    val oldProgress = ProgressUtils.getOldProgress(this.engineExecutionContext)
-    if (normalizedProgress < oldProgress) oldProgress
-    else {
-      ProgressUtils.putProgress(normalizedProgress, this.engineExecutionContext)
-      normalizedProgress
+  override def progress(taskID: String): Float =
+    if (jobGroup == null || engineExecutionContext.getTotalParagraph == 0) {
+      ProgressUtils.getOldProgress(this.engineExecutionContext)
+    } else {
+      val newProgress =
+        (engineExecutionContext.getCurrentParagraph * 1f - 1f) / engineExecutionContext.getTotalParagraph + JobProgressUtil
+          .progress(sc, jobGroup) / engineExecutionContext.getTotalParagraph
+      val normalizedProgress = if (newProgress >= 1) newProgress - 0.1f else newProgress
+      val oldProgress = ProgressUtils.getOldProgress(this.engineExecutionContext)
+      if (normalizedProgress < oldProgress) oldProgress
+      else {
+        ProgressUtils.putProgress(normalizedProgress, this.engineExecutionContext)
+        normalizedProgress
+      }
     }
-  }
 
-  override def getProgressInfo(taskID: String): Array[JobProgressInfo] = if (jobGroup == null)
+  override def getProgressInfo(taskID: String): Array[JobProgressInfo] = if (jobGroup == null) {
     Array.empty
-  else {
+  } else {
     logger.debug("request new progress info for jobGroup is " + jobGroup)
     val progressInfoArray = ArrayBuffer[JobProgressInfo]()
     progressInfoArray ++= JobProgressUtil.getActiveJobProgressInfo(sc, jobGroup)
@@ -220,20 +220,10 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long)
   override def getCurrentNodeResource(): NodeResource = {
     logger.info("Begin to get actual used resources!")
     Utils.tryCatch({
-      //      val driverHost: String = sc.getConf.get("spark.driver.host")
-      //      val executorMemList = sc.getExecutorMemoryStatus.filter(x => !x._1.split(":")(0).equals(driverHost)).map(x => x._2._1)
       val executorNum: Int = sc.getConf.get("spark.executor.instances").toInt
       val executorMem: Long =
         ByteTimeUtils.byteStringAsBytes(sc.getConf.get("spark.executor.memory")) * executorNum
-
-      //      if(executorMemList.size>0) {
-      //        executorMem = executorMemList.reduce((x, y) => x + y)
-      //      }
       val driverMem: Long = ByteTimeUtils.byteStringAsBytes(sc.getConf.get("spark.driver.memory"))
-      //      val driverMemList = sc.getExecutorMemoryStatus.filter(x => x._1.split(":")(0).equals(driverHost)).map(x => x._2._1)
-      //      if(driverMemList.size > 0) {
-      //          driverMem = driverMemList.reduce((x, y) => x + y)
-      //      }
       val sparkExecutorCores = sc.getConf.get("spark.executor.cores", "2").toInt * executorNum
       val sparkDriverCores = sc.getConf.get("spark.driver.cores", "1").toInt
       val queue = sc.getConf.get("spark.yarn.queue")
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
index f2719343f..0cf862d04 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
@@ -73,8 +73,6 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
   private val lineOutputStream = new RsOutputStream
   val sqlContext = sparkEngineSession.sqlContext
   val SUCCESS = "success"
-  /*@throws(classOf[IOException])
-  override def open = {}*/
   private lazy val py4jToken: String = RandomStringUtils.randomAlphanumeric(256)
 
   private lazy val gwBuilder: GatewayServerBuilder = {
@@ -91,11 +89,11 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
     }
   }
 
-  def getSparkConf = sc.getConf
+  def getSparkConf: Unit = sc.getConf
 
-  def getJavaSparkContext = new JavaSparkContext(sc)
+  def getJavaSparkContext: Unit = new JavaSparkContext(sc)
 
-  def getSparkSession = if (sparkSession != null) sparkSession
+  def getSparkSession: Object = if (sparkSession != null) sparkSession
   else () => throw new IllegalAccessException("not supported keyword spark in spark1.x versions")
 
   override def init(): Unit = {
@@ -136,7 +134,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
   override def getKind: Kind = PySpark()
 
   private def initGateway = {
-    //  如果从前端获取到用户所设置的Python版本为Python3 则取Python3的环境变量,否则默认为Python2
+    //  If the python version set by the user is obtained from the front end as python3, the environment variable of python3 is taken; otherwise, the default is python2
     logger.info(
       s"spark.python.version => ${engineCreationContext.getOptions.get("spark.python.version")}"
     )
@@ -168,7 +166,6 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
     )
     val pythonClasspath = new StringBuilder(pythonPath)
 
-    //
     val files = sc.getConf.get("spark.files", "")
     logger.info(s"output spark files ${files}")
     if (StringUtils.isNotEmpty(files)) {
@@ -231,9 +228,6 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
       //      close
       Utils.tryFinally({
         if (promise != null && !promise.isCompleted) {
-          /*val out = outputStream.toString
-          if (StringUtils.isNotEmpty(out)) promise.failure(new ExecuteError(30034,out))
-          else*/
           promise.failure(new ExecuteError(40007, "Pyspark process  has stopped, query failed!"))
         }
       }) {
@@ -276,16 +270,6 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
     }
   }
 
-  /*override protected def getInitLabels(): util.List[Label[_]] = {
-    val runTypeLabel = new CodeLanguageLabel
-    runTypeLabel.setRunType(RunType.PYSPARK.toString)
-    val engineTypeLabel = getEngineTypeLabel
-    val labels = new util.ArrayList[Label[_]](2)
-    labels.add(runTypeLabel)
-    labels.add(engineTypeLabel)
-    labels
-  }*/
-
   def executeLine(code: String): ExecuteResponse = {
     if (sc.isStopped) {
       throw new IllegalStateException("Application has been stopped, please relogin to try it.")
@@ -299,6 +283,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
     this.code = code
     engineExecutionContext.appendStdout(s"${EngineUtils.getName} >> $code")
     queryLock synchronized queryLock.notify()
+    // scalastyle:off awaitresult
     Await.result(promise.future, Duration.Inf)
     lineOutputStream.flush()
     val outStr = lineOutputStream.toString()
@@ -315,13 +300,13 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
     SuccessExecuteResponse()
   }
 
-  def onPythonScriptInitialized(pid: Int) = {
+  def onPythonScriptInitialized(pid: Int): Unit = {
     this.pid = Some(pid.toString)
     pythonScriptInitialized = true
     logger.info(s"Pyspark process has been initialized.pid is $pid")
   }
 
-  def getStatements = {
+  def getStatements: PythonInterpretRequest = {
     queryLock synchronized { while (code == null || !pythonScriptInitialized) queryLock.wait() }
     logger.info(
       "Prepare to deal python code, code: " + code
@@ -333,7 +318,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
     request
   }
 
-  def setStatementsFinished(out: String, error: Boolean) = {
+  def setStatementsFinished(out: String, error: Boolean): Any = {
     logger.info(s"A python code finished, has some errors happened?  $error.")
     Utils.tryQuietly(Thread.sleep(10))
     if (!error) {
@@ -348,7 +333,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
     }
   }
 
-  def appendOutput(message: String) = {
+  def appendOutput(message: String): Unit = {
     if (!pythonScriptInitialized) {
       logger.info(message)
     } else {
@@ -356,7 +341,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
     }
   }
 
-  def appendErrorOutput(message: String) = {
+  def appendErrorOutput(message: String): Unit = {
     if (!pythonScriptInitialized) {
       logger.info(message)
     } else {
@@ -365,7 +350,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
     }
   }
 
-  def showDF(jobGroup: String, df: Any) = {
+  def showDF(jobGroup: String, df: Any): Unit = {
     SQLSession.showDF(
       sc,
       jobGroup,
@@ -377,7 +362,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
     logger.info("Pyspark showDF execute success!")
   }
 
-  def showAliasDF(jobGroup: String, df: Any, alias: String) = {
+  def showAliasDF(jobGroup: String, df: Any, alias: String): Unit = {
     SQLSession.showDF(
       sc,
       jobGroup,
@@ -389,7 +374,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
     logger.info("Pyspark showAliasDF execute success!")
   }
 
-  def showHTML(jobGroup: String, htmlContent: Any) = {
+  def showHTML(jobGroup: String, htmlContent: Any): Unit = {
     SQLSession.showHTML(sc, jobGroup, htmlContent, this.engineExecutionContext)
     logger.info("Pyspark showHTML execute success!")
   }
@@ -411,9 +396,9 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
     )
   }
 
-  def listUDFs() = UDF.listUDFs
+  def listUDFs(): Unit = UDF.listUDFs
 
-  def existsUDF(name: String) = UDF.existsUDF(name)
+  def existsUDF(name: String): Boolean = UDF.existsUDF(name)
 
   override protected def getExecutorIdPreFix: String = "SparkPythonExecutor_"
 
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala
index f1685e7f1..de2271e54 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala
@@ -49,6 +49,7 @@ import org.apache.spark.sql.{SparkSession, SQLContext}
 import org.apache.spark.util.SparkUtils
 
 import java.io.{BufferedReader, File}
+import java.util.Locale
 
 import scala.tools.nsc.interpreter.{
   isReplPower,
@@ -246,7 +247,7 @@ class SparkScalaExecutor(sparkEngineSession: SparkEngineSession, id: Long)
   private def matchFatalLog(errorMsg: String): Boolean = {
     var flag = false
     if (StringUtils.isNotBlank(errorMsg)) {
-      val errorMsgLowCase = errorMsg.toLowerCase
+      val errorMsgLowCase = errorMsg.toLowerCase(Locale.getDefault())
       fatalLogs.foreach(fatalLog =>
         if (errorMsgLowCase.contains(fatalLog)) {
           logger.error(s"match engineConn log fatal logs,is $fatalLog")
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index ecb699ab6..9aae45664 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -62,19 +62,18 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
     if (pythonLibUris.length == 2) {
       val sparkConfValue1 = Utils.tryQuietly(CommonVars("spark.yarn.dist.files", "").getValue)
       val sparkConfValue2 = Utils.tryQuietly(sparkConf.get("spark.yarn.dist.files"))
-      if (StringUtils.isEmpty(sparkConfValue1) && StringUtils.isEmpty(sparkConfValue2))
+      if (StringUtils.isEmpty(sparkConfValue1) && StringUtils.isEmpty(sparkConfValue2)) {
         sparkConf.set("spark.yarn.dist.files", pythonLibUris.mkString(","))
-      else if (StringUtils.isEmpty(sparkConfValue1))
+      } else if (StringUtils.isEmpty(sparkConfValue1)) {
         sparkConf.set("spark.yarn.dist.files", sparkConfValue2 + "," + pythonLibUris.mkString(","))
-      else if (StringUtils.isEmpty(sparkConfValue2))
+      } else if (StringUtils.isEmpty(sparkConfValue2)) {
         sparkConf.set("spark.yarn.dist.files", sparkConfValue1 + "," + pythonLibUris.mkString(","))
-      else
+      } else {
         sparkConf.set(
           "spark.yarn.dist.files",
           sparkConfValue1 + "," + sparkConfValue2 + "," + pythonLibUris.mkString(",")
         )
-//      if (!useSparkSubmit) sparkConf.set("spark.files", sparkConf.get("spark.yarn.dist.files"))
-//      sparkConf.set("spark.submit.pyFiles", pythonLibUris.mkString(","))
+      }
     }
     // Distributes needed libraries to workers
     // when spark version is greater than or equal to 1.5.0
@@ -86,8 +85,9 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
       "print current thread name " + Thread.currentThread().getContextClassLoader.toString
     )
     val sparkSession = createSparkSession(outputDir, sparkConf)
-    if (sparkSession == null)
+    if (sparkSession == null) {
       throw new SparkSessionNullException(40009, "sparkSession can not be null")
+    }
 
     val sc = sparkSession.sparkContext
     val sqlContext =
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/CsvRelation.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/CsvRelation.scala
index 3727516c1..4e033a524 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/CsvRelation.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/CsvRelation.scala
@@ -82,7 +82,7 @@ class CsvRelation(@transient private val source: Map[String, Any]) extends Seria
       transfer(spark.sparkContext, path, encoding)
     }
     val indexs = getFilterColumnIndex(rdd, fieldDelimiter, columns, hasHeader)
-    // header处理
+    // header
     val tokenRdd = if (hasHeader) {
       rdd
         .mapPartitionsWithIndex((index, iter) => if (index == 0) iter.drop(1) else iter)
@@ -106,13 +106,6 @@ class CsvRelation(@transient private val source: Map[String, Any]) extends Seria
       columns: List[Map[String, Any]],
       hasHeader: Boolean
   ): Array[Int] = {
-    /*if(hasHeader){
-      val columnNames: List[String] = columns.map(_.getOrElse("name",null).toString)
-      val split: Array[String] = rdd.first().split(fieldDelimiter)
-      split.indices.filter(i =>columnNames.contains(split(i))).toArray
-    }else{
-      columns.map(_.getOrElse("name",null).toString.split("_")(1).toInt -1).toArray
-    }*/
     columns.map(_.getOrElse("index", 0).toString.toInt).toArray
   }
 
@@ -202,7 +195,9 @@ class CsvRelation(@transient private val source: Map[String, Any]) extends Seria
       isOverwrite: Boolean = false
   ): Boolean = {
     val filesystemPath = new Path(path)
+    // scalastyle:off hadoopconfiguration
     spark.sparkContext.hadoopConfiguration.setBoolean("fs.hdfs.impl.disable.cache", true)
+    // scalastyle:off hadoopconfiguration
     val fs = filesystemPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
     fs.setVerifyChecksum(false)
     fs.setWriteChecksum(false)
@@ -267,12 +262,12 @@ class CsvRelation(@transient private val source: Map[String, Any]) extends Seria
     val msg = new StringBuilder
     schema.indices.foreach { i =>
       val data = row(i) match {
-        case value: String => {
-          if (("NULL".equals(value) || "".equals(value)) && !"SHUFFLEOFF".equals(exportNullValue))
+        case value: String =>
+          if (("NULL".equals(value) || "".equals(value)) && !"SHUFFLEOFF".equals(exportNullValue)) {
             exportNullValue
-          else
+          } else {
             value.replaceAll("\n|\t", " ")
-        }
+          }
         case value: Any => value.toString
         case _ => if ("SHUFFLEOFF".equals(exportNullValue)) "NULL" else exportNullValue
       }
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/ExportData.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/ExportData.scala
index d2c94c017..187277349 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/ExportData.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/ExportData.scala
@@ -60,11 +60,11 @@ object ExportData extends Logging {
 
     val pathType = LoadData.getMapValue[String](dest, "pathType", "share")
     val path =
-      if ("share".equals(pathType))
+      if ("share".equals(pathType)) {
         "file://" + LoadData.getMapValue[String](dest, "path")
-      else if (SparkConfiguration.IS_VIEWFS_ENV.getValue)
+      } else if (SparkConfiguration.IS_VIEWFS_ENV.getValue) {
         LoadData.getMapValue[String](dest, "path")
-      else "hdfs://" + LoadData.getMapValue[String](dest, "path")
+      } else "hdfs://" + LoadData.getMapValue[String](dest, "path")
 
     val hasHeader = LoadData.getMapValue[Boolean](dest, "hasHeader", false)
     val isCsv = LoadData.getMapValue[Boolean](dest, "isCsv", true)
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/LoadData.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/LoadData.scala
index 0dc157eff..6d278175b 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/LoadData.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/LoadData.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.types._
 
 import java.io.{BufferedInputStream, File, FileInputStream}
+import java.util.Locale
 
 import scala.collection.JavaConverters._
 
@@ -146,14 +147,16 @@ object LoadData {
             )
           }
         } else {
-          if (isOverwrite)
+          if (isOverwrite) {
             spark.sql(s"INSERT OVERWRITE TABLE  $database.$tableName select * from tempTable")
-          else
+          } else {
             spark.sql(s"INSERT INTO   $database.$tableName select * from tempTable")
+          }
         }
       } else {
-        if (spark.catalog.tableExists(database, tableName))
+        if (spark.catalog.tableExists(database, tableName)) {
           spark.sql(s"drop table if exists $database.$tableName")
+        }
         if (isPartition) {
           val columnSql = getColumnSql(columns)
           val sql =
@@ -187,8 +190,9 @@ object LoadData {
 
   def copyFileToHdfs(path: String, fs: FileSystem): String = {
     val file = new File(path)
-    if (file.isDirectory)
+    if (file.isDirectory) {
       throw new Exception("Import must be a file, not a directory(导入的必须是文件,不能是目录)")
+    }
     val in = new BufferedInputStream(new FileInputStream(file))
     val hdfsPath =
       "/tmp/" + System.getProperty("user.name") + "/" + System.currentTimeMillis + file.getName
@@ -207,18 +211,20 @@ object LoadData {
       case JNothing => default
       case value: JValue =>
         if ("JString()".equals(value.toString)) default
-        else
+        else {
           try value.extract[T]
           catch { case t: Throwable => default }
+        }
     }
   }
 
   def getMapValue[T](map: Map[String, Any], key: String, default: T = null.asInstanceOf[T]): T = {
     val value = map.get(key).map(_.asInstanceOf[T]).getOrElse(default)
-    if (StringUtils.isEmpty(value.toString))
+    if (StringUtils.isEmpty(value.toString)) {
       default
-    else
+    } else {
       value
+    }
   }
 
   def getColumnSql(columns: List[Map[String, Any]]): String = {
@@ -226,12 +232,14 @@ object LoadData {
     columns.foreach { column =>
       val name =
         if (column("name") != null) column("name").asInstanceOf[String]
-        else
+        else {
           throw new IllegalArgumentException(
             "When create a table, the field name must be defined(建立新表时,字段名必须定义)"
           )
+        }
       sql.append("`").append(name).append("` ")
-      val dataType = column.getOrElse("type", "string").asInstanceOf[String].toLowerCase
+      val dataType =
+        column.getOrElse("type", "string").asInstanceOf[String].toLowerCase(Locale.getDefault())
       sql.append(dataType)
       dataType match {
         case "char" | "varchar" =>
@@ -256,10 +264,11 @@ object LoadData {
     columns.map { column =>
       val name =
         if (column("name") != null) column("name").asInstanceOf[String]
-        else
+        else {
           throw new IllegalArgumentException(
             "When create a table, the field name must be defined(建立新表时,字段名必须定义)"
           )
+        }
       val dataType = column.getOrElse("type", "string").asInstanceOf[String]
       val precision = Utils.tryCatch(column.getOrElse("precision", 20).toString.toInt) {
         case e: Exception => 20
@@ -267,7 +276,7 @@ object LoadData {
       val scale = Utils.tryCatch(column.getOrElse("scale", 4).toString.toInt) { case e: Exception =>
         4
       }
-      StructField(name, toDataType(dataType.toLowerCase, precision, scale), true)
+      StructField(name, toDataType(dataType.toLowerCase(Locale.getDefault), precision, scale), true)
     }.toArray
   }
 
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/mdq/MDQPostExecutionHook.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/mdq/MDQPostExecutionHook.scala
index 0b2accd24..60c1dc1ef 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/mdq/MDQPostExecutionHook.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/mdq/MDQPostExecutionHook.scala
@@ -56,15 +56,17 @@ class MDQPostExecutionHook extends SparkPostExecutionHook with Logging {
       case l: CodeLanguageLabel => l.getCodeType
       case _ => ""
     }
-    if (StringUtils.isEmpty(runType) || !SparkKind.FUNCTION_MDQ_TYPE.equalsIgnoreCase(runType))
+    if (StringUtils.isEmpty(runType) || !SparkKind.FUNCTION_MDQ_TYPE.equalsIgnoreCase(runType)) {
       return
+    }
     val sender = Sender.getSender(SparkConfiguration.MDQ_APPLICATION_NAME.getValue)
     executeResponse match {
       case SuccessExecuteResponse() =>
         sender.ask(DDLExecuteResponse(true, code, StorageUtils.getJvmUser)) match {
           case DDLCompleteResponse(status) =>
-            if (!status)
+            if (!status) {
               logger.warn(s"failed to execute create table :$code (执行建表失败):$code")
+            }
         }
       case _ => logger.warn(s"failed to execute create table:$code (执行建表失败:$code)")
     }
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/mdq/MDQPreExecutionHook.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/mdq/MDQPreExecutionHook.scala
index 3551aba66..d27086f76 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/mdq/MDQPreExecutionHook.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/mdq/MDQPreExecutionHook.scala
@@ -60,8 +60,9 @@ class MDQPreExecutionHook extends SparkPreExecutionHook with Logging {
       case _ =>
         ""
     }
-    if (StringUtils.isEmpty(runType) || !SparkKind.FUNCTION_MDQ_TYPE.equalsIgnoreCase(runType))
+    if (StringUtils.isEmpty(runType) || !SparkKind.FUNCTION_MDQ_TYPE.equalsIgnoreCase(runType)) {
       return code
+    }
     val sender = Sender.getSender(SparkConfiguration.MDQ_APPLICATION_NAME.getValue)
     val params = new util.HashMap[String, Object]()
     params.put("user", StorageUtils.getJvmUser)
@@ -71,7 +72,10 @@ class MDQPreExecutionHook extends SparkPreExecutionHook with Logging {
       resp = sender.ask(DDLRequest(params))
     } { case e: Exception =>
       logger.error(s"Call MDQ rpc failed, ${e.getMessage}", e)
-      throw new MDQErrorException(40010, s"向MDQ服务请求解析为可以执行的sql时失败, ${e.getMessage}")
+      throw new MDQErrorException(
+        40010,
+        s"The request to the MDQ service to parse into executable SQL failed(向MDQ服务请求解析为可以执行的sql时失败), ${e.getMessage}"
+      )
     }
     resp match {
       case DDLResponse(postCode) => postCode
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala
index 5f460d9f5..8c197ae69 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala
@@ -42,7 +42,7 @@ object EngineUtils extends Logging {
 
   def getName: String = Sender.getThisServiceInstance.getInstance
 
-  def findAvailPort = {
+  def findAvailPort: Int = {
     val socket = new ServerSocket(0)
     Utils.tryFinally(socket.getLocalPort) { Utils.tryQuietly(socket.close()) }
   }
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UDF.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UDF.scala
index ba2338279..215129e1c 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UDF.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UDF.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.SparkSession
 object UDF extends Serializable {
 
   def listUDFs()(implicit spark: SparkSession): Unit =
+    // scalastyle:off println
     spark.sessionState.functionRegistry.listFunction().foreach(println)
 
   def existsUDF(name: String)(implicit spark: SparkSession): Boolean = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org