You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/09/08 12:13:38 UTC
[incubator-linkis] branch dev-1.3.1 updated: [linkis-engineplugin-spark] Modification of scala file floating red (#3194)
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
new 5cf593eaa [linkis-engineplugin-spark] Modification of scala file floating red (#3194)
5cf593eaa is described below
commit 5cf593eaa01654655316870ed8ad31cc9c5e81f5
Author: 成彬彬 <10...@users.noreply.github.com>
AuthorDate: Thu Sep 8 20:13:34 2022 +0800
[linkis-engineplugin-spark] Modification of scala file floating red (#3194)
---
.../spark/Interpreter/Interpreter.scala | 2 +-
.../spark/Interpreter/ProcessInterpreter.scala | 4 +-
.../spark/Interpreter/PythonInterpreter.scala | 8 ++--
.../args/SparkPythonArgsPreExecutionHook.scala | 4 +-
.../engineplugin/spark/common/LogContainer.scala | 6 +--
.../engineplugin/spark/common/SparkKind.scala | 4 +-
.../engineplugin/spark/cs/CSTableParser.scala | 1 +
.../engineplugin/spark/executor/SQLSession.scala | 11 ++++--
.../spark/executor/SparkEngineConnExecutor.scala | 42 ++++++++------------
.../spark/executor/SparkPythonExecutor.scala | 45 ++++++++--------------
.../spark/executor/SparkScalaExecutor.scala | 3 +-
.../spark/factory/SparkEngineConnFactory.scala | 14 +++----
.../engineplugin/spark/imexport/CsvRelation.scala | 19 ++++-----
.../engineplugin/spark/imexport/ExportData.scala | 6 +--
.../engineplugin/spark/imexport/LoadData.scala | 31 +++++++++------
.../spark/mdq/MDQPostExecutionHook.scala | 6 ++-
.../spark/mdq/MDQPreExecutionHook.scala | 8 +++-
.../engineplugin/spark/utils/EngineUtils.scala | 2 +-
.../spark/sql/execution/datasources/csv/UDF.scala | 1 +
19 files changed, 106 insertions(+), 111 deletions(-)
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/Interpreter.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/Interpreter.scala
index 7dd4ff6db..f6d6c797e 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/Interpreter.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/Interpreter.scala
@@ -36,7 +36,7 @@ trait Interpreter {
@throws(classOf[TimeoutException])
@throws(classOf[InterruptedException])
- final def waitForStateChange(oldState: State, atMost: Duration) = {
+ final def waitForStateChange(oldState: State, atMost: Duration): Unit = {
Utils.waitUntil({ () => state != oldState }, atMost)
}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/ProcessInterpreter.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/ProcessInterpreter.scala
index f703a3b6d..171d48e4f 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/ProcessInterpreter.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/ProcessInterpreter.scala
@@ -87,6 +87,7 @@ abstract class ProcessInterpreter(process: Process) extends Interpreter with Log
Future {
val exitCode = process.waitFor()
if (exitCode != 0) {
+ // scalastyle:off println
errOut.lines.foreach(println)
println(getClass.getSimpleName + " has stopped with exit code " + process.exitValue)
_state = Error()
@@ -115,8 +116,7 @@ abstract class ProcessInterpreter(process: Process) extends Interpreter with Log
IOUtils.closeQuietly(stdin)
IOUtils.closeQuietly(stdout)
errOut.close
-
- // Give ourselves 10 seconds to tear down the process.
+ // scalastyle:off awaitresult
Utils.tryFinally(Await.result(future, Duration(10, TimeUnit.SECONDS))) {
process.destroy()
}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala
index a7824b7ff..f6f5a7d67 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala
@@ -66,7 +66,7 @@ object PythonInterpreter {
new PythonInterpreter(process, gatewayServer)
}
- def pythonPath = {
+ def pythonPath: String = {
val pythonPath = new ArrayBuffer[String]
val pythonHomePath = new File(SparkConfiguration.SPARK_HOME.getValue, "python").getPath
val pythonParentPath = new File(pythonHomePath, "lib")
@@ -156,6 +156,7 @@ private class PythonInterpreter(process: Process, gatewayServer: GatewayServer)
val iterable = initOut.iterator
while (continue && iterable.hasNext) {
iterable.next match {
+ // scalastyle:off println
case "READY" => println("Start python application succeed."); continue = false
case str: String => println(str)
case _ =>
@@ -182,16 +183,15 @@ private class PythonInterpreter(process: Process, gatewayServer: GatewayServer)
}
private def sendRequest(request: Map[String, Any]): Option[JValue] = {
+ // scalastyle:off println
stdin.println(Serialization.write(request))
stdin.flush()
Option(stdout.readLine()).map { line => parse(line) }
}
- def pythonPath = {
+ def pythonPath: String = {
val pythonPath = new ArrayBuffer[String]
- // sys.env.get("SPARK_HOME").foreach { sparkHome =>
- // }
val pythonHomePath = new File(SparkConfiguration.SPARK_HOME.getValue, "python").getPath
val pythonParentPath = new File(pythonHomePath, "lib")
pythonPath += pythonHomePath
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/args/SparkPythonArgsPreExecutionHook.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/args/SparkPythonArgsPreExecutionHook.scala
index 6313b9885..fd4a9723b 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/args/SparkPythonArgsPreExecutionHook.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/args/SparkPythonArgsPreExecutionHook.scala
@@ -64,7 +64,9 @@ class SparkPythonArgsPreExecutionHook extends SparkPreExecutionHook with Logging
engineExecutionContext.getTotalParagraph != 1 || StringUtils.isEmpty(
runType
) || !StringUtils.equals(RunType.PYSPARK.toString, runType)
- ) return code
+ ) {
+ return code
+ }
val argsArr =
if (
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/LogContainer.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/LogContainer.scala
index e476b4edf..698e52a3b 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/LogContainer.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/LogContainer.scala
@@ -39,11 +39,11 @@ class LogContainer(val logSize: Int) {
}
}
- def putLogs(logs: Iterable[String]) = synchronized {
+ def putLogs(logs: Iterable[String]): Unit = synchronized {
logs.foreach(putLog)
}
- def reset() = synchronized {
+ def reset(): Unit = synchronized {
flag = 0
tail = 0
}
@@ -64,7 +64,7 @@ class LogContainer(val logSize: Int) {
}
}
- def size = {
+ def size: Int = {
if (flag == tail) 0
else if (flag > tail) tail + logSize - flag
else tail - flag
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala
index 67fcd9c7e..6ba8b210c 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala
@@ -44,12 +44,12 @@ object SparkKind {
getKind(Kind.getKind(code))
}
- def getSessionKind(code: String) = {
+ def getSessionKind(code: String): Kind with Product with Serializable = {
val kindStr = Kind.getKindString(code)
if (kindStr.indexOf("@") == 0) getKind(kindStr.substring(1)) else SparkMix()
}
- private def getKind(kindStr: String) = {
+ private def getKind(kindStr: String): Kind with Product with Serializable = {
kindStr match {
case SPARKSCALA_TYPE | SCALA_LAN => SparkScala()
case PYSPARK_TYPE | PYTHON_LAN | PYTHON_END => PySpark()
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSTableParser.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSTableParser.scala
index 4a8127164..a862a45b3 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSTableParser.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/cs/CSTableParser.scala
@@ -71,6 +71,7 @@ object CSTableParser extends Logging {
csTempTables.foreach { csTempTable =>
val table = getCSTable(csTempTable, contextIDValueStr, nodeNameStr)
if (null == table) {
+ // scalastyle:off throwerror
throw new ExecuteError(40007, s"The csTable that name is $csTempTable not found in cs")
}
registerTempTable(table)
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SQLSession.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SQLSession.scala
index d259d8824..1201665c5 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SQLSession.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SQLSession.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{StructField, StructType}
import java.text.NumberFormat
+import java.util.Locale
import scala.collection.mutable.ArrayBuffer
@@ -97,16 +98,20 @@ object SQLSession extends Logging {
// val columnsSet = dataFrame.schema
val columns = columnsSet
.map(c =>
- Column(c.name, DataType.toDataType(c.dataType.typeName.toLowerCase), c.getComment().orNull)
+ Column(
+ c.name,
+ DataType.toDataType(c.dataType.typeName.toLowerCase(Locale.getDefault())),
+ c.getComment().orNull
+ )
)
.toArray[Column]
columns.foreach(c => logger.info(s"c is ${c.columnName}, comment is ${c.comment}"))
if (columns == null || columns.isEmpty) return
val metaData = new TableMetaData(columns)
val writer =
- if (StringUtils.isNotBlank(alias))
+ if (StringUtils.isNotBlank(alias)) {
engineExecutionContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE, alias)
- else engineExecutionContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
+ } else engineExecutionContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
writer.addMetaData(metaData)
var index = 0
Utils.tryThrow({
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
index 19a7d546f..35cf6801a 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
@@ -159,25 +159,25 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long)
executeLine(engineExecutorContext, newcode)
}
- override def progress(taskID: String): Float = if (
- jobGroup == null || engineExecutionContext.getTotalParagraph == 0
- ) ProgressUtils.getOldProgress(this.engineExecutionContext)
- else {
- val newProgress =
- (engineExecutionContext.getCurrentParagraph * 1f - 1f) / engineExecutionContext.getTotalParagraph + JobProgressUtil
- .progress(sc, jobGroup) / engineExecutionContext.getTotalParagraph
- val normalizedProgress = if (newProgress >= 1) newProgress - 0.1f else newProgress
- val oldProgress = ProgressUtils.getOldProgress(this.engineExecutionContext)
- if (normalizedProgress < oldProgress) oldProgress
- else {
- ProgressUtils.putProgress(normalizedProgress, this.engineExecutionContext)
- normalizedProgress
+ override def progress(taskID: String): Float =
+ if (jobGroup == null || engineExecutionContext.getTotalParagraph == 0) {
+ ProgressUtils.getOldProgress(this.engineExecutionContext)
+ } else {
+ val newProgress =
+ (engineExecutionContext.getCurrentParagraph * 1f - 1f) / engineExecutionContext.getTotalParagraph + JobProgressUtil
+ .progress(sc, jobGroup) / engineExecutionContext.getTotalParagraph
+ val normalizedProgress = if (newProgress >= 1) newProgress - 0.1f else newProgress
+ val oldProgress = ProgressUtils.getOldProgress(this.engineExecutionContext)
+ if (normalizedProgress < oldProgress) oldProgress
+ else {
+ ProgressUtils.putProgress(normalizedProgress, this.engineExecutionContext)
+ normalizedProgress
+ }
}
- }
- override def getProgressInfo(taskID: String): Array[JobProgressInfo] = if (jobGroup == null)
+ override def getProgressInfo(taskID: String): Array[JobProgressInfo] = if (jobGroup == null) {
Array.empty
- else {
+ } else {
logger.debug("request new progress info for jobGroup is " + jobGroup)
val progressInfoArray = ArrayBuffer[JobProgressInfo]()
progressInfoArray ++= JobProgressUtil.getActiveJobProgressInfo(sc, jobGroup)
@@ -220,20 +220,10 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long)
override def getCurrentNodeResource(): NodeResource = {
logger.info("Begin to get actual used resources!")
Utils.tryCatch({
- // val driverHost: String = sc.getConf.get("spark.driver.host")
- // val executorMemList = sc.getExecutorMemoryStatus.filter(x => !x._1.split(":")(0).equals(driverHost)).map(x => x._2._1)
val executorNum: Int = sc.getConf.get("spark.executor.instances").toInt
val executorMem: Long =
ByteTimeUtils.byteStringAsBytes(sc.getConf.get("spark.executor.memory")) * executorNum
-
- // if(executorMemList.size>0) {
- // executorMem = executorMemList.reduce((x, y) => x + y)
- // }
val driverMem: Long = ByteTimeUtils.byteStringAsBytes(sc.getConf.get("spark.driver.memory"))
- // val driverMemList = sc.getExecutorMemoryStatus.filter(x => x._1.split(":")(0).equals(driverHost)).map(x => x._2._1)
- // if(driverMemList.size > 0) {
- // driverMem = driverMemList.reduce((x, y) => x + y)
- // }
val sparkExecutorCores = sc.getConf.get("spark.executor.cores", "2").toInt * executorNum
val sparkDriverCores = sc.getConf.get("spark.driver.cores", "1").toInt
val queue = sc.getConf.get("spark.yarn.queue")
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
index f2719343f..0cf862d04 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
@@ -73,8 +73,6 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
private val lineOutputStream = new RsOutputStream
val sqlContext = sparkEngineSession.sqlContext
val SUCCESS = "success"
- /*@throws(classOf[IOException])
- override def open = {}*/
private lazy val py4jToken: String = RandomStringUtils.randomAlphanumeric(256)
private lazy val gwBuilder: GatewayServerBuilder = {
@@ -91,11 +89,11 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
}
}
- def getSparkConf = sc.getConf
+ def getSparkConf: Unit = sc.getConf
- def getJavaSparkContext = new JavaSparkContext(sc)
+ def getJavaSparkContext: Unit = new JavaSparkContext(sc)
- def getSparkSession = if (sparkSession != null) sparkSession
+ def getSparkSession: Object = if (sparkSession != null) sparkSession
else () => throw new IllegalAccessException("not supported keyword spark in spark1.x versions")
override def init(): Unit = {
@@ -136,7 +134,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
override def getKind: Kind = PySpark()
private def initGateway = {
- // 如果从前端获取到用户所设置的Python版本为Python3 则取Python3的环境变量,否则默认为Python2
+ // If the python version set by the user is obtained from the front end as python3, the environment variable of python3 is taken; otherwise, the default is python2
logger.info(
s"spark.python.version => ${engineCreationContext.getOptions.get("spark.python.version")}"
)
@@ -168,7 +166,6 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
)
val pythonClasspath = new StringBuilder(pythonPath)
- //
val files = sc.getConf.get("spark.files", "")
logger.info(s"output spark files ${files}")
if (StringUtils.isNotEmpty(files)) {
@@ -231,9 +228,6 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
// close
Utils.tryFinally({
if (promise != null && !promise.isCompleted) {
- /*val out = outputStream.toString
- if (StringUtils.isNotEmpty(out)) promise.failure(new ExecuteError(30034,out))
- else*/
promise.failure(new ExecuteError(40007, "Pyspark process has stopped, query failed!"))
}
}) {
@@ -276,16 +270,6 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
}
}
- /*override protected def getInitLabels(): util.List[Label[_]] = {
- val runTypeLabel = new CodeLanguageLabel
- runTypeLabel.setRunType(RunType.PYSPARK.toString)
- val engineTypeLabel = getEngineTypeLabel
- val labels = new util.ArrayList[Label[_]](2)
- labels.add(runTypeLabel)
- labels.add(engineTypeLabel)
- labels
- }*/
-
def executeLine(code: String): ExecuteResponse = {
if (sc.isStopped) {
throw new IllegalStateException("Application has been stopped, please relogin to try it.")
@@ -299,6 +283,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
this.code = code
engineExecutionContext.appendStdout(s"${EngineUtils.getName} >> $code")
queryLock synchronized queryLock.notify()
+ // scalastyle:off awaitresult
Await.result(promise.future, Duration.Inf)
lineOutputStream.flush()
val outStr = lineOutputStream.toString()
@@ -315,13 +300,13 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
SuccessExecuteResponse()
}
- def onPythonScriptInitialized(pid: Int) = {
+ def onPythonScriptInitialized(pid: Int): Unit = {
this.pid = Some(pid.toString)
pythonScriptInitialized = true
logger.info(s"Pyspark process has been initialized.pid is $pid")
}
- def getStatements = {
+ def getStatements: PythonInterpretRequest = {
queryLock synchronized { while (code == null || !pythonScriptInitialized) queryLock.wait() }
logger.info(
"Prepare to deal python code, code: " + code
@@ -333,7 +318,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
request
}
- def setStatementsFinished(out: String, error: Boolean) = {
+ def setStatementsFinished(out: String, error: Boolean): Any = {
logger.info(s"A python code finished, has some errors happened? $error.")
Utils.tryQuietly(Thread.sleep(10))
if (!error) {
@@ -348,7 +333,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
}
}
- def appendOutput(message: String) = {
+ def appendOutput(message: String): Unit = {
if (!pythonScriptInitialized) {
logger.info(message)
} else {
@@ -356,7 +341,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
}
}
- def appendErrorOutput(message: String) = {
+ def appendErrorOutput(message: String): Unit = {
if (!pythonScriptInitialized) {
logger.info(message)
} else {
@@ -365,7 +350,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
}
}
- def showDF(jobGroup: String, df: Any) = {
+ def showDF(jobGroup: String, df: Any): Unit = {
SQLSession.showDF(
sc,
jobGroup,
@@ -377,7 +362,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
logger.info("Pyspark showDF execute success!")
}
- def showAliasDF(jobGroup: String, df: Any, alias: String) = {
+ def showAliasDF(jobGroup: String, df: Any, alias: String): Unit = {
SQLSession.showDF(
sc,
jobGroup,
@@ -389,7 +374,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
logger.info("Pyspark showAliasDF execute success!")
}
- def showHTML(jobGroup: String, htmlContent: Any) = {
+ def showHTML(jobGroup: String, htmlContent: Any): Unit = {
SQLSession.showHTML(sc, jobGroup, htmlContent, this.engineExecutionContext)
logger.info("Pyspark showHTML execute success!")
}
@@ -411,9 +396,9 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
)
}
- def listUDFs() = UDF.listUDFs
+ def listUDFs(): Unit = UDF.listUDFs
- def existsUDF(name: String) = UDF.existsUDF(name)
+ def existsUDF(name: String): Boolean = UDF.existsUDF(name)
override protected def getExecutorIdPreFix: String = "SparkPythonExecutor_"
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala
index f1685e7f1..de2271e54 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala
@@ -49,6 +49,7 @@ import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.util.SparkUtils
import java.io.{BufferedReader, File}
+import java.util.Locale
import scala.tools.nsc.interpreter.{
isReplPower,
@@ -246,7 +247,7 @@ class SparkScalaExecutor(sparkEngineSession: SparkEngineSession, id: Long)
private def matchFatalLog(errorMsg: String): Boolean = {
var flag = false
if (StringUtils.isNotBlank(errorMsg)) {
- val errorMsgLowCase = errorMsg.toLowerCase
+ val errorMsgLowCase = errorMsg.toLowerCase(Locale.getDefault())
fatalLogs.foreach(fatalLog =>
if (errorMsgLowCase.contains(fatalLog)) {
logger.error(s"match engineConn log fatal logs,is $fatalLog")
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index ecb699ab6..9aae45664 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -62,19 +62,18 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
if (pythonLibUris.length == 2) {
val sparkConfValue1 = Utils.tryQuietly(CommonVars("spark.yarn.dist.files", "").getValue)
val sparkConfValue2 = Utils.tryQuietly(sparkConf.get("spark.yarn.dist.files"))
- if (StringUtils.isEmpty(sparkConfValue1) && StringUtils.isEmpty(sparkConfValue2))
+ if (StringUtils.isEmpty(sparkConfValue1) && StringUtils.isEmpty(sparkConfValue2)) {
sparkConf.set("spark.yarn.dist.files", pythonLibUris.mkString(","))
- else if (StringUtils.isEmpty(sparkConfValue1))
+ } else if (StringUtils.isEmpty(sparkConfValue1)) {
sparkConf.set("spark.yarn.dist.files", sparkConfValue2 + "," + pythonLibUris.mkString(","))
- else if (StringUtils.isEmpty(sparkConfValue2))
+ } else if (StringUtils.isEmpty(sparkConfValue2)) {
sparkConf.set("spark.yarn.dist.files", sparkConfValue1 + "," + pythonLibUris.mkString(","))
- else
+ } else {
sparkConf.set(
"spark.yarn.dist.files",
sparkConfValue1 + "," + sparkConfValue2 + "," + pythonLibUris.mkString(",")
)
-// if (!useSparkSubmit) sparkConf.set("spark.files", sparkConf.get("spark.yarn.dist.files"))
-// sparkConf.set("spark.submit.pyFiles", pythonLibUris.mkString(","))
+ }
}
// Distributes needed libraries to workers
// when spark version is greater than or equal to 1.5.0
@@ -86,8 +85,9 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
"print current thread name " + Thread.currentThread().getContextClassLoader.toString
)
val sparkSession = createSparkSession(outputDir, sparkConf)
- if (sparkSession == null)
+ if (sparkSession == null) {
throw new SparkSessionNullException(40009, "sparkSession can not be null")
+ }
val sc = sparkSession.sparkContext
val sqlContext =
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/CsvRelation.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/CsvRelation.scala
index 3727516c1..4e033a524 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/CsvRelation.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/CsvRelation.scala
@@ -82,7 +82,7 @@ class CsvRelation(@transient private val source: Map[String, Any]) extends Seria
transfer(spark.sparkContext, path, encoding)
}
val indexs = getFilterColumnIndex(rdd, fieldDelimiter, columns, hasHeader)
- // header处理
+ // header
val tokenRdd = if (hasHeader) {
rdd
.mapPartitionsWithIndex((index, iter) => if (index == 0) iter.drop(1) else iter)
@@ -106,13 +106,6 @@ class CsvRelation(@transient private val source: Map[String, Any]) extends Seria
columns: List[Map[String, Any]],
hasHeader: Boolean
): Array[Int] = {
- /*if(hasHeader){
- val columnNames: List[String] = columns.map(_.getOrElse("name",null).toString)
- val split: Array[String] = rdd.first().split(fieldDelimiter)
- split.indices.filter(i =>columnNames.contains(split(i))).toArray
- }else{
- columns.map(_.getOrElse("name",null).toString.split("_")(1).toInt -1).toArray
- }*/
columns.map(_.getOrElse("index", 0).toString.toInt).toArray
}
@@ -202,7 +195,9 @@ class CsvRelation(@transient private val source: Map[String, Any]) extends Seria
isOverwrite: Boolean = false
): Boolean = {
val filesystemPath = new Path(path)
+ // scalastyle:off hadoopconfiguration
spark.sparkContext.hadoopConfiguration.setBoolean("fs.hdfs.impl.disable.cache", true)
+ // scalastyle:off hadoopconfiguration
val fs = filesystemPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
fs.setVerifyChecksum(false)
fs.setWriteChecksum(false)
@@ -267,12 +262,12 @@ class CsvRelation(@transient private val source: Map[String, Any]) extends Seria
val msg = new StringBuilder
schema.indices.foreach { i =>
val data = row(i) match {
- case value: String => {
- if (("NULL".equals(value) || "".equals(value)) && !"SHUFFLEOFF".equals(exportNullValue))
+ case value: String =>
+ if (("NULL".equals(value) || "".equals(value)) && !"SHUFFLEOFF".equals(exportNullValue)) {
exportNullValue
- else
+ } else {
value.replaceAll("\n|\t", " ")
- }
+ }
case value: Any => value.toString
case _ => if ("SHUFFLEOFF".equals(exportNullValue)) "NULL" else exportNullValue
}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/ExportData.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/ExportData.scala
index d2c94c017..187277349 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/ExportData.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/ExportData.scala
@@ -60,11 +60,11 @@ object ExportData extends Logging {
val pathType = LoadData.getMapValue[String](dest, "pathType", "share")
val path =
- if ("share".equals(pathType))
+ if ("share".equals(pathType)) {
"file://" + LoadData.getMapValue[String](dest, "path")
- else if (SparkConfiguration.IS_VIEWFS_ENV.getValue)
+ } else if (SparkConfiguration.IS_VIEWFS_ENV.getValue) {
LoadData.getMapValue[String](dest, "path")
- else "hdfs://" + LoadData.getMapValue[String](dest, "path")
+ } else "hdfs://" + LoadData.getMapValue[String](dest, "path")
val hasHeader = LoadData.getMapValue[Boolean](dest, "hasHeader", false)
val isCsv = LoadData.getMapValue[Boolean](dest, "isCsv", true)
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/LoadData.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/LoadData.scala
index 0dc157eff..6d278175b 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/LoadData.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/LoadData.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.io.{BufferedInputStream, File, FileInputStream}
+import java.util.Locale
import scala.collection.JavaConverters._
@@ -146,14 +147,16 @@ object LoadData {
)
}
} else {
- if (isOverwrite)
+ if (isOverwrite) {
spark.sql(s"INSERT OVERWRITE TABLE $database.$tableName select * from tempTable")
- else
+ } else {
spark.sql(s"INSERT INTO $database.$tableName select * from tempTable")
+ }
}
} else {
- if (spark.catalog.tableExists(database, tableName))
+ if (spark.catalog.tableExists(database, tableName)) {
spark.sql(s"drop table if exists $database.$tableName")
+ }
if (isPartition) {
val columnSql = getColumnSql(columns)
val sql =
@@ -187,8 +190,9 @@ object LoadData {
def copyFileToHdfs(path: String, fs: FileSystem): String = {
val file = new File(path)
- if (file.isDirectory)
+ if (file.isDirectory) {
throw new Exception("Import must be a file, not a directory(导入的必须是文件,不能是目录)")
+ }
val in = new BufferedInputStream(new FileInputStream(file))
val hdfsPath =
"/tmp/" + System.getProperty("user.name") + "/" + System.currentTimeMillis + file.getName
@@ -207,18 +211,20 @@ object LoadData {
case JNothing => default
case value: JValue =>
if ("JString()".equals(value.toString)) default
- else
+ else {
try value.extract[T]
catch { case t: Throwable => default }
+ }
}
}
def getMapValue[T](map: Map[String, Any], key: String, default: T = null.asInstanceOf[T]): T = {
val value = map.get(key).map(_.asInstanceOf[T]).getOrElse(default)
- if (StringUtils.isEmpty(value.toString))
+ if (StringUtils.isEmpty(value.toString)) {
default
- else
+ } else {
value
+ }
}
def getColumnSql(columns: List[Map[String, Any]]): String = {
@@ -226,12 +232,14 @@ object LoadData {
columns.foreach { column =>
val name =
if (column("name") != null) column("name").asInstanceOf[String]
- else
+ else {
throw new IllegalArgumentException(
"When create a table, the field name must be defined(建立新表时,字段名必须定义)"
)
+ }
sql.append("`").append(name).append("` ")
- val dataType = column.getOrElse("type", "string").asInstanceOf[String].toLowerCase
+ val dataType =
+ column.getOrElse("type", "string").asInstanceOf[String].toLowerCase(Locale.getDefault())
sql.append(dataType)
dataType match {
case "char" | "varchar" =>
@@ -256,10 +264,11 @@ object LoadData {
columns.map { column =>
val name =
if (column("name") != null) column("name").asInstanceOf[String]
- else
+ else {
throw new IllegalArgumentException(
"When create a table, the field name must be defined(建立新表时,字段名必须定义)"
)
+ }
val dataType = column.getOrElse("type", "string").asInstanceOf[String]
val precision = Utils.tryCatch(column.getOrElse("precision", 20).toString.toInt) {
case e: Exception => 20
@@ -267,7 +276,7 @@ object LoadData {
val scale = Utils.tryCatch(column.getOrElse("scale", 4).toString.toInt) { case e: Exception =>
4
}
- StructField(name, toDataType(dataType.toLowerCase, precision, scale), true)
+ StructField(name, toDataType(dataType.toLowerCase(Locale.getDefault), precision, scale), true)
}.toArray
}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/mdq/MDQPostExecutionHook.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/mdq/MDQPostExecutionHook.scala
index 0b2accd24..60c1dc1ef 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/mdq/MDQPostExecutionHook.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/mdq/MDQPostExecutionHook.scala
@@ -56,15 +56,17 @@ class MDQPostExecutionHook extends SparkPostExecutionHook with Logging {
case l: CodeLanguageLabel => l.getCodeType
case _ => ""
}
- if (StringUtils.isEmpty(runType) || !SparkKind.FUNCTION_MDQ_TYPE.equalsIgnoreCase(runType))
+ if (StringUtils.isEmpty(runType) || !SparkKind.FUNCTION_MDQ_TYPE.equalsIgnoreCase(runType)) {
return
+ }
val sender = Sender.getSender(SparkConfiguration.MDQ_APPLICATION_NAME.getValue)
executeResponse match {
case SuccessExecuteResponse() =>
sender.ask(DDLExecuteResponse(true, code, StorageUtils.getJvmUser)) match {
case DDLCompleteResponse(status) =>
- if (!status)
+ if (!status) {
logger.warn(s"failed to execute create table :$code (执行建表失败):$code")
+ }
}
case _ => logger.warn(s"failed to execute create table:$code (执行建表失败:$code)")
}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/mdq/MDQPreExecutionHook.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/mdq/MDQPreExecutionHook.scala
index 3551aba66..d27086f76 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/mdq/MDQPreExecutionHook.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/mdq/MDQPreExecutionHook.scala
@@ -60,8 +60,9 @@ class MDQPreExecutionHook extends SparkPreExecutionHook with Logging {
case _ =>
""
}
- if (StringUtils.isEmpty(runType) || !SparkKind.FUNCTION_MDQ_TYPE.equalsIgnoreCase(runType))
+ if (StringUtils.isEmpty(runType) || !SparkKind.FUNCTION_MDQ_TYPE.equalsIgnoreCase(runType)) {
return code
+ }
val sender = Sender.getSender(SparkConfiguration.MDQ_APPLICATION_NAME.getValue)
val params = new util.HashMap[String, Object]()
params.put("user", StorageUtils.getJvmUser)
@@ -71,7 +72,10 @@ class MDQPreExecutionHook extends SparkPreExecutionHook with Logging {
resp = sender.ask(DDLRequest(params))
} { case e: Exception =>
logger.error(s"Call MDQ rpc failed, ${e.getMessage}", e)
- throw new MDQErrorException(40010, s"向MDQ服务请求解析为可以执行的sql时失败, ${e.getMessage}")
+ throw new MDQErrorException(
+ 40010,
+ s"The request to the MDQ service to parse into executable SQL failed(向MDQ服务请求解析为可以执行的sql时失败), ${e.getMessage}"
+ )
}
resp match {
case DDLResponse(postCode) => postCode
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala
index 5f460d9f5..8c197ae69 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/EngineUtils.scala
@@ -42,7 +42,7 @@ object EngineUtils extends Logging {
def getName: String = Sender.getThisServiceInstance.getInstance
- def findAvailPort = {
+ def findAvailPort: Int = {
val socket = new ServerSocket(0)
Utils.tryFinally(socket.getLocalPort) { Utils.tryQuietly(socket.close()) }
}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UDF.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UDF.scala
index ba2338279..215129e1c 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UDF.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UDF.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.SparkSession
object UDF extends Serializable {
def listUDFs()(implicit spark: SparkSession): Unit =
+ // scalastyle:off println
spark.sessionState.functionRegistry.listFunction().foreach(println)
def existsUDF(name: String)(implicit spark: SparkSession): Boolean = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org