You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/07/28 07:45:34 UTC

[incubator-linkis] branch dev-1.2.0 updated: fix: fix the ExceptionUtils.getStackTrace NPE, and format some code style (#2536)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.2.0 by this push:
     new d4cfbac12 fix: fix the ExceptionUtils.getStackTrace NPE, and format some code style (#2536)
d4cfbac12 is described below

commit d4cfbac12a30ff3dec2567eee24ebaf0de5c4187
Author: Jack Xu <xu...@126.com>
AuthorDate: Thu Jul 28 15:45:28 2022 +0800

    fix: fix the ExceptionUtils.getStackTrace NPE, and format some code style (#2536)
    
    * fix: fix the ExceptionUtils.getStackTrace NPE, add format some code sytle
---
 .../org/apache/linkis/common/log/LogUtils.scala    | 17 ++++++------
 .../org/apache/linkis/common/utils/JsonUtils.scala |  2 --
 .../scala/org/apache/linkis/server/Message.scala   | 27 +++++++++----------
 .../org/apache/linkis/rpc/BaseRPCSender.scala      | 14 +++++-----
 .../org/apache/linkis/rpc/RPCReceiveRestful.scala  | 21 ++++++++-------
 .../org/apache/linkis/rpc/RPCSpringBeanCache.scala | 13 +++++----
 .../org/apache/linkis/rpc/ReceiverChooser.scala    |  4 +--
 .../SpringCloudFeignConfigurationCache.scala       |  4 +--
 .../linkis/rpc/sender/SpringMVCRPCSender.scala     | 10 +++----
 .../org/apache/linkis/scheduler/queue/Job.scala    | 31 +++++++++++-----------
 .../async/AsyncConcurrentComputationExecutor.scala |  5 +++-
 .../service/DefaultExecutorHeartbeatService.scala  |  2 +-
 .../persistence/QueryPersistenceManager.java       | 11 +-------
 .../linkis/entrance/execute/EntranceExecutor.scala |  4 ++-
 .../apache/linkis/manager/am/utils/AMUtils.scala   | 24 ++++++++---------
 .../rm/external/yarn/YarnResourceRequester.scala   |  6 +++--
 .../executor/OpenLooKengEngineConnExecutor.java    | 27 ++++++++++++++++---
 .../execution/impl/AbstractExecutionFactory.scala  |  3 +--
 .../execution/impl/BaseTaskScheduler.scala         | 16 ++++-------
 .../metadata/hive/dto/MetadataQueryParam.java      |  6 ++---
 .../label/client/InstanceLabelClient.scala         |  2 +-
 .../service/impl/JobHistoryQueryServiceImpl.scala  | 11 ++++----
 22 files changed, 134 insertions(+), 126 deletions(-)

diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/log/LogUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/log/LogUtils.scala
index 9b66291c5..0a776b0ef 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/log/LogUtils.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/log/LogUtils.scala
@@ -25,34 +25,33 @@ import java.util.Date
 object LogUtils {
 
 
-  private def getTimeFormat:String = {
-    val simpleDateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.mmm")
+  private def getTimeFormat: String = {
+    val simpleDateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.mmm")
     val now = new Date(System.currentTimeMillis())
     simpleDateFormat.format(now)
-    //now.toString(ISODateTimeFormat.yearMonthDay()) + " " + now.toString(ISODateTimeFormat.hourMinuteSecondMillis())
   }
 
-  def generateInfo(rawLog:String):String = {
+  def generateInfo(rawLog: String): String = {
     getTimeFormat + " " + "INFO" + " " + rawLog
   }
 
-  def generateERROR(rawLog:String):String = {
+  def generateERROR(rawLog: String): String = {
     getTimeFormat + " " + "ERROR" + " " + rawLog
   }
 
-  def generateWarn(rawLog:String):String = {
+  def generateWarn(rawLog: String): String = {
     getTimeFormat + " " + "WARN" + " " + rawLog
   }
 
-  def generateSystemInfo(rawLog:String):String = {
+  def generateSystemInfo(rawLog: String): String = {
     getTimeFormat + " " + "SYSTEM-INFO" + " " + rawLog
   }
 
-  def generateSystemError(rawLog:String):String = {
+  def generateSystemError(rawLog: String): String = {
     getTimeFormat + " " + "SYSTEM-ERROR" + " " + rawLog
   }
 
-  def generateSystemWarn(rawLog:String):String = {
+  def generateSystemWarn(rawLog: String): String = {
     getTimeFormat + " " + "SYSTEM-WARN" + " " + rawLog
   }
 
diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/JsonUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/JsonUtils.scala
index 44a2d1f13..05ac7bd21 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/JsonUtils.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/JsonUtils.scala
@@ -24,8 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper
 
 object JsonUtils {
 
-  //TODO add gson
-
   implicit val jackson = new ObjectMapper().setDateFormat(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ"))
 
 }
diff --git a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Message.scala b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Message.scala
index c1f8686af..d603861f3 100644
--- a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Message.scala
+++ b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Message.scala
@@ -17,14 +17,13 @@
 
 package org.apache.linkis.server
 
-import java.util
-import javax.servlet.http.HttpServletRequest
 import org.apache.commons.lang3.StringUtils
 import org.apache.commons.lang3.exception.ExceptionUtils
-import org.apache.linkis.common.utils.Logging
-
 import org.springframework.web.context.request.{RequestContextHolder, ServletRequestAttributes}
 
+import java.util
+import javax.servlet.http.HttpServletRequest
+
 
 class Message(private var method: String,
               private var status: Int = 0,          //-1 no login, 0 success, 1 error, 2 validate failed, 3 auth failed, 4 warning
@@ -59,13 +58,10 @@ class Message(private var method: String,
   def setData(data: util.HashMap[String, Object]): Unit = this.data = data
   def getData: util.HashMap[String, Object] = data
 
-  //  def isSuccess = status == 0
-  //  def isError = status != 0
-
   override def toString: String = s"Message($getMethod, $getStatus, $getData)"
 }
 
-object Message extends Logging {
+object Message {
 
   def apply(method: String = null, status: Int = 0, message: String = null,
             data: util.HashMap[String, Object] = new util.HashMap[String, Object]): Message = {
@@ -94,17 +90,21 @@ object Message extends Logging {
   }
   def error(msg: String): Message = error(msg, null)
   implicit def error(t: Throwable): Message = {
-    Message(status = 1).setMessage(ExceptionUtils.getRootCauseMessage(t)) << ("stack", ExceptionUtils.getStackTrace(t))
+    error(ExceptionUtils.getRootCauseMessage(t), t)
   }
+
   implicit def error(e: (String, Throwable)): Message = error(e._1, e._2)
   implicit def error(msg: String, t: Throwable): Message = {
-    val message = Message(status = 1)
+    error(msg, t, MessageStatus.ERROR)
+  }
+  implicit def error(msg: String, t: Throwable, status: Int): Message = {
+    val message = Message(status = status)
     message.setMessage(msg)
     if(t != null) message << ("stack", ExceptionUtils.getStackTrace(t))
     message
   }
   implicit def warn(msg: String): Message = {
-    val message = Message(status = 4)
+    val message = Message(status = MessageStatus.WARNING)
     message.setMessage(msg)
     message
   }
@@ -121,10 +121,7 @@ object Message extends Logging {
   }
 
   def noLogin(msg: String, t: Throwable): Message = {
-    val message = Message(status = -1)
-    message.setMessage(msg)
-    if(t != null) message << ("stack", ExceptionUtils.getStackTrace(t))
-    message
+    error(msg, t, MessageStatus.NO_LOGIN)
   }
   def noLogin(msg: String): Message = noLogin(msg, null)
 
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala
index 94f3e2912..b41c397af 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala
@@ -67,7 +67,7 @@ private[rpc] class BaseRPCSender extends Sender with Logging {
   protected def newRPC: RPCReceiveRemote = {
     val builder = Feign.builder.logger(new Slf4jLogger()).logLevel(feign.Logger.Level.FULL)
     doBuilder(builder)
-    var url = if(name.startsWith("http://")) name else "http://" + name
+    var url = if (name.startsWith("http://")) name else "http://" + name
     if(url.endsWith("/")) url = url.substring(0, url.length - 1)
     url += ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue
     builder.target(classOf[RPCReceiveRemote], url)
@@ -80,14 +80,14 @@ private[rpc] class BaseRPCSender extends Sender with Logging {
     case _ => op
   }
 
-  override def ask(message: Any): Any = execute(message){
+  override def ask(message: Any): Any = execute(message) {
     val msg = RPCProduct.getRPCProduct.toMessage(message)
     BaseRPCSender.addInstanceInfo(msg.getData)
     val response = getRPC.receiveAndReply(msg)
     RPCConsumer.getRPCConsumer.toObject(response)
   }
 
-  override def ask(message: Any, timeout: Duration): Any = execute(message){
+  override def ask(message: Any, timeout: Duration): Any = execute(message) {
     val msg = RPCProduct.getRPCProduct.toMessage(message)
     msg.data("duration", timeout.toMillis)
     BaseRPCSender.addInstanceInfo(msg.getData)
@@ -95,7 +95,7 @@ private[rpc] class BaseRPCSender extends Sender with Logging {
     RPCConsumer.getRPCConsumer.toObject(response)
   }
 
-  private def sendIt(message: Any, op: Message => Message): Unit = execute(message){
+  private def sendIt(message: Any, op: Message => Message): Unit = execute(message) {
     val msg = RPCProduct.getRPCProduct.toMessage(message)
     BaseRPCSender.addInstanceInfo(msg.getData)
     RPCConsumer.getRPCConsumer.toObject(op(msg)) match {
@@ -118,13 +118,13 @@ private[rpc] class BaseRPCSender extends Sender with Logging {
 
   protected def getRPCSenderListenerBus = BaseRPCSender.rpcSenderListenerBus
 
-  override def equals(obj: scala.Any): Boolean = if(obj == null) false
+  override def equals(obj: scala.Any): Boolean = if (obj == null) false
     else obj match {
       case sender: BaseRPCSender => name == sender.name
       case _ => false
     }
 
-  override def hashCode(): Int = if(name == null) 0 else name.hashCode
+  override def hashCode(): Int = if (name == null) 0 else name.hashCode
 
   override def toString: String = s"RPCSender($name)"
 }
@@ -139,7 +139,7 @@ private[rpc] object BaseRPCSender extends Logging {
     override def onMessageEventError(event: RPCMessageEvent, t: Throwable): Unit =
       logger.warn(s"${event.serviceInstance} deliver RPC message failed! Message: " + event.message, t)
   })
-  def addInstanceInfo[T](map: util.Map[String, T]): Unit ={
+  def addInstanceInfo[T](map: util.Map[String, T]): Unit = {
     map.put("name", DataWorkCloudApplication.getApplicationName.asInstanceOf[T])
     map.put("instance", DataWorkCloudApplication.getInstance.asInstanceOf[T])
   }
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRestful.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRestful.scala
index 05669ef13..2e166738d 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRestful.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRestful.scala
@@ -64,14 +64,17 @@ private[rpc] class RPCReceiveRestful extends RPCReceiveRemote with Logging {
   }
 
   @PostConstruct
-  def initListenerBus(): Unit =  {
-    if(!receiverChoosers.exists(_.isInstanceOf[CommonReceiverChooser]))
+  def initListenerBus(): Unit = {
+    if (!receiverChoosers.exists(_.isInstanceOf[CommonReceiverChooser])) {
       receiverChoosers = receiverChoosers :+ new CommonReceiverChooser
-    if(!receiverChoosers.exists(_.isInstanceOf[MessageReceiverChooser]))
+    }
+    if (!receiverChoosers.exists(_.isInstanceOf[MessageReceiverChooser])) {
       receiverChoosers = receiverChoosers :+ new MessageReceiverChooser
+    }
     logger.info("init all receiverChoosers in spring beans, list => " + receiverChoosers.toList)
-    if(!receiverSenderBuilders.exists(_.isInstanceOf[CommonReceiverSenderBuilder]))
+    if (!receiverSenderBuilders.exists(_.isInstanceOf[CommonReceiverSenderBuilder])) {
       receiverSenderBuilders = receiverSenderBuilders :+ new CommonReceiverSenderBuilder
+    }
     receiverSenderBuilders = receiverSenderBuilders.sortBy(_.order)
     logger.info("init all receiverSenderBuilders in spring beans, list => " + receiverSenderBuilders.toList)
     val queueSize = BDP_RPC_RECEIVER_ASYN_QUEUE_CAPACITY.acquireNew
@@ -93,7 +96,7 @@ private[rpc] class RPCReceiveRestful extends RPCReceiveRemote with Logging {
     rpcReceiverListenerBus.start()
   }
 
-  private def addBroadcastListener(broadcastListener: BroadcastListener): Unit = if(rpcReceiverListenerBus != null) {
+  private def addBroadcastListener(broadcastListener: BroadcastListener): Unit = if (rpcReceiverListenerBus != null) {
     logger.info("add a new RPCBroadcastListener => " + broadcastListener.getClass)
     rpcReceiverListenerBus.addListener(new RPCMessageEventListener {
       val listenerName = broadcastListener.getClass.getSimpleName
@@ -114,7 +117,7 @@ private[rpc] class RPCReceiveRestful extends RPCReceiveRemote with Logging {
       RPCProduct.getRPCProduct.toMessage(obj)
   }
 
-  @RequestMapping(path = Array("/rpc/receive"),method = Array(RequestMethod.POST))
+  @RequestMapping(path = Array("/rpc/receive"), method = Array(RequestMethod.POST))
   override def receive(@RequestBody message: Message): Message = catchIt {
     val obj = RPCConsumer.getRPCConsumer.toObject(message)
     val event = RPCMessageEvent(obj, BaseRPCSender.getInstanceInfo(message.getData))
@@ -128,13 +131,13 @@ private[rpc] class RPCReceiveRestful extends RPCReceiveRemote with Logging {
     event.map(opEvent(_, obj, event)).getOrElse(RPCProduct.getRPCProduct.notFound())
   }
 
-  @RequestMapping(path = Array("/rpc/receiveAndReply"),method = Array(RequestMethod.POST))
+  @RequestMapping(path = Array("/rpc/receiveAndReply"), method = Array(RequestMethod.POST))
   override def receiveAndReply(@RequestBody message: Message): Message = receiveAndReply(message, _.receiveAndReply(_, _))
 
-  @RequestMapping(path = Array("/rpc/replyInMills"),method = Array(RequestMethod.POST))
+  @RequestMapping(path = Array("/rpc/replyInMills"), method = Array(RequestMethod.POST))
   override def receiveAndReplyInMills(@RequestBody message: Message): Message = catchIt {
     val duration = message.getData.get("duration")
-    if(duration == null || StringUtils.isEmpty(duration.toString)) throw new DWCURIException(10002, "The timeout period is not set!(超时时间未设置!)")
+    if (duration == null || StringUtils.isEmpty(duration.toString)) throw new DWCURIException(10002, "The timeout period is not set!(超时时间未设置!)")
     val timeout = Duration(duration.toString.toLong, TimeUnit.MILLISECONDS)
     receiveAndReply(message, _.receiveAndReply(_, timeout, _))
   }
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala
index 32fea4cc8..e26a63d0c 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala
@@ -37,29 +37,32 @@ private[rpc] object RPCSpringBeanCache extends Logging {
   private var rpcReceiveRestful: RPCReceiveRestful = _
 
   def registerReceiver(receiverName: String, receiver: Receiver): Unit = {
-    if(beanNameToReceivers == null) beanNameToReceivers = getApplicationContext.getBeansOfType(classOf[Receiver])
+    if (beanNameToReceivers == null) beanNameToReceivers = getApplicationContext.getBeansOfType(classOf[Receiver])
     logger.info(s"register a new receiver with name $receiverName, receiver is " + receiver)
     beanNameToReceivers synchronized beanNameToReceivers.put(receiverName, receiver)
   }
   def registerReceiverChooser(receiverChooser: ReceiverChooser): Unit = {
-    if(rpcReceiveRestful == null)
+    if (rpcReceiveRestful == null) {
       rpcReceiveRestful = getApplicationContext.getBean(classOf[RPCReceiveRestful])
+    }
     rpcReceiveRestful.registerReceiverChooser(receiverChooser)
   }
   def registerBroadcastListener(broadcastListener: BroadcastListener): Unit = {
-    if(rpcReceiveRestful == null)
+    if (rpcReceiveRestful == null) {
       rpcReceiveRestful = getApplicationContext.getBean(classOf[RPCReceiveRestful])
+    }
     rpcReceiveRestful.registerBroadcastListener(broadcastListener)
   }
 
   def getRPCReceiveRestful: RPCReceiveRestful = {
-    if(rpcReceiveRestful == null)
+    if (rpcReceiveRestful == null) {
       rpcReceiveRestful = getApplicationContext.getBean(classOf[RPCReceiveRestful])
+    }
     rpcReceiveRestful
   }
 
   private[rpc] def getReceivers: util.Map[String, Receiver] = {
-    if(beanNameToReceivers == null) beanNameToReceivers = getApplicationContext.getBeansOfType(classOf[Receiver])
+    if (beanNameToReceivers == null) beanNameToReceivers = getApplicationContext.getBeansOfType(classOf[Receiver])
     beanNameToReceivers
   }
 
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/ReceiverChooser.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/ReceiverChooser.scala
index 285df4f1d..1012e1367 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/ReceiverChooser.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/ReceiverChooser.scala
@@ -29,10 +29,10 @@ trait ReceiverChooser {
 }
 
 class CommonReceiverChooser extends ReceiverChooser {
-  override def chooseReceiver(event: RPCMessageEvent): Option[Receiver] = if(getReceivers.size() == 1) Some(getReceivers.values.iterator.next)
+  override def chooseReceiver(event: RPCMessageEvent): Option[Receiver] = if (getReceivers.size() == 1) Some(getReceivers.values.iterator.next)
   else {
     var receiver = getReceivers.get(event.serviceInstance.getApplicationName)
-    if(receiver == null) receiver = getReceivers.get("receiver")
+    if (receiver == null) receiver = getReceivers.get("receiver")
     Option(receiver)
   }
 }
\ No newline at end of file
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringCloudFeignConfigurationCache.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringCloudFeignConfigurationCache.scala
index 45c4aee46..667dd52ff 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringCloudFeignConfigurationCache.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringCloudFeignConfigurationCache.scala
@@ -71,13 +71,13 @@ private[linkis] object SpringCloudFeignConfigurationCache {
   private[rpc] def getDecoder = decoder
   private[rpc] def getContract = contract
   private[rpc] def getClient = {
-    if(client == null) DataWorkCloudApplication.getApplicationContext.getBean(classOf[SpringCloudFeignConfigurationCache])
+    if (client == null) DataWorkCloudApplication.getApplicationContext.getBean(classOf[SpringCloudFeignConfigurationCache])
     client
   }
   private[rpc] def getClientFactory = clientFactory
   private[rpc] def getLoadBalancedRetryFactory = loadBalancedRetryFactory
   private[linkis] def getDiscoveryClient = {
-    if(discoveryClient == null) DataWorkCloudApplication.getApplicationContext.getBean(classOf[SpringCloudFeignConfigurationCache])
+    if (discoveryClient == null) DataWorkCloudApplication.getApplicationContext.getBean(classOf[SpringCloudFeignConfigurationCache])
     discoveryClient
   }
   private[rpc] def getRPCTicketIdRequestInterceptor = rpcTicketIdRequestInterceptor
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala
index d1db13a14..e5784d95f 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala
@@ -55,7 +55,7 @@ private[rpc] class SpringMVCRPCSender private[rpc](private[rpc] val serviceInsta
         new FeignLoadBalancer(getClientFactory.getLoadBalancer(clientName), getClientFactory.getClientConfig(clientName), serverIntrospector) {
           override def customizeLoadBalancerCommandBuilder(request: FeignLoadBalancer.RibbonRequest, config: IClientConfig,
                                                            builder: LoadBalancerCommand.Builder[FeignLoadBalancer.RibbonResponse]): Unit = {
-            val instance = if(getRPCLoadBalancers.isEmpty) None else {
+            val instance = if (getRPCLoadBalancers.isEmpty) None else {
               val requestBody = SpringMVCRPCSender.getRequest(request).body()
               val requestStr = new String(requestBody, DWCConfiguration.BDP_ENCODING.getValue)
               val obj = RPCConsumer.getRPCConsumer.toObject(BDPJettyServerHelper.gson.fromJson(requestStr, classOf[Message]))
@@ -97,7 +97,7 @@ private[rpc] class SpringMVCRPCSender private[rpc](private[rpc] val serviceInsta
     */
   override def deliver(message: Any): Unit = getRPCSenderListenerBus.post(RPCMessageEvent(message, serviceInstance))
 
-  override def equals(obj: Any): Boolean = if(obj == null) false
+  override def equals(obj: Any): Boolean = if (obj == null) false
     else obj match {
       case sender: SpringMVCRPCSender => sender.serviceInstance == serviceInstance
       case _ => false
@@ -105,14 +105,14 @@ private[rpc] class SpringMVCRPCSender private[rpc](private[rpc] val serviceInsta
 
   override def hashCode(): Int = serviceInstance.hashCode()
 
-  override val toString: String = if(StringUtils.isBlank(serviceInstance.getInstance)) s"RPCSender(${serviceInstance.getApplicationName})"
+  override val toString: String = if (StringUtils.isBlank(serviceInstance.getInstance)) s"RPCSender(${serviceInstance.getApplicationName})"
     else s"RPCSender($getApplicationName, ${serviceInstance.getInstance})"
 }
 private object SpringMVCRPCSender {
   private var requestField: Field = _
   def getRequest(req: ClientRequest): Request = {
-    if(requestField == null) synchronized {
-      if(requestField == null) {
+    if (requestField == null) synchronized {
+      if (requestField == null) {
         requestField = req.getClass.getDeclaredField("request")
         requestField.setAccessible(true)
       }
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala
index b4e95f093..74d498760 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala
@@ -54,8 +54,9 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
   private var retryNum = 0
   private[linkis] var errorExecuteResponse: ErrorExecuteResponse = _
 
-  override def isWaiting = super.isWaiting && !interrupt
-  override def isCompleted = super.isCompleted || interrupt
+  override def isWaiting: Boolean = super.isWaiting && !interrupt
+
+  override def isCompleted: Boolean = super.isCompleted || interrupt
 
   def kill(): Unit = onFailure("Job is killed by user!", null)
 
@@ -129,27 +130,27 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
   }
 
 
-  def setListenerEventBus(eventListenerBus: ListenerEventBus[_<: SchedulerListener, _<: ScheduleEvent]) = this.eventListenerBus = eventListenerBus
+  def setListenerEventBus(eventListenerBus: ListenerEventBus[_ <: SchedulerListener, _ <: ScheduleEvent]): Unit = this.eventListenerBus = eventListenerBus
 
-  def setExecutor(executor: Executor) = this.executor = executor
+  def setExecutor(executor: Executor): Unit = this.executor = executor
   protected def getExecutor = executor
 
-  def setJobListener(jobListener: JobListener) = this.jobListener = Some(jobListener)
+  def setJobListener(jobListener: JobListener): Unit = this.jobListener = Some(jobListener)
 
-  def getJobListener = jobListener
+  def getJobListener: Option[JobListener] = jobListener
 
-  def setLogListener(logListener: LogListener) = this.logListener = Some(logListener)
+  def setLogListener(logListener: LogListener): Unit = this.logListener = Some(logListener)
 
-  def getLogListener = logListener
+  def getLogListener: Option[LogListener] = logListener
 
 
-  def setProgressListener(progressListener: ProgressListener) = this.progressListener = Some(progressListener)
+  def setProgressListener(progressListener: ProgressListener): Unit = this.progressListener = Some(progressListener)
 
-  def getProgressListener = progressListener
+  def getProgressListener: Option[ProgressListener] = progressListener
 
-  def getProgress = progress
+  def getProgress: Float = progress
 
-  def setProgress(progress: Float) = this.progress = progress
+  def setProgress(progress: Float): Unit = this.progress = progress
 
   @throws[Exception]
   def init(): Unit
@@ -161,7 +162,7 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
 
   def getJobInfo: JobInfo
 
-  def getErrorResponse = errorExecuteResponse
+  def getErrorResponse: ErrorExecuteResponse = errorExecuteResponse
 
   protected def existsJobDaemon: Boolean = false
 
@@ -223,7 +224,7 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
     IOUtils.closeQuietly(this)
   }
   def isJobSupportRetry: Boolean = true
-  def getRetryNum = retryNum
+  def getRetryNum: Int = retryNum
   protected def getMaxRetryNum: Int = 2
   protected def isJobShouldRetry(errorExecuteResponse: ErrorExecuteResponse): Boolean =
     isJobSupportRetry && errorExecuteResponse != null && (errorExecuteResponse.t match {
@@ -261,7 +262,7 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
       close()
       return
     }
-    val rs = Utils.tryCatch(executor.execute(jobToExecuteRequest)){
+    val rs = Utils.tryCatch(executor.execute(jobToExecuteRequest)) {
       case t: InterruptedException =>
         logger.warn(s"job $toString is interrupted by user!", t)
         ErrorExecuteResponse("job is interrupted by user!", t)
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncConcurrentComputationExecutor.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncConcurrentComputationExecutor.scala
index 7091d259b..67ac2d448 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncConcurrentComputationExecutor.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncConcurrentComputationExecutor.scala
@@ -99,7 +99,10 @@ abstract class AsyncConcurrentComputationExecutor(override val outputPrintLimit:
     response match {
       case e: ErrorExecuteResponse =>
         logger.error("execute code failed!", e.t)
-        engineExecutionContext.appendStdout(LogUtils.generateERROR(s"execute code failed!: ${ExceptionUtils.getStackTrace(e.t)}"))
+        val errorStr = if (e.t != null) {
+          ExceptionUtils.getStackTrace(e.t)
+        } else StringUtils.EMPTY
+        engineExecutionContext.appendStdout(LogUtils.generateERROR(s"execute code failed!: $errorStr"))
       case SuccessExecuteResponse() =>
         logger.info(s"task{${engineConnTask.getTaskId} execute success")
       case e: OutputExecuteResponse =>
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala
index 837a3a0dd..0fdda710f 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala
@@ -58,7 +58,7 @@ class DefaultExecutorHeartbeatService extends ExecutorHeartbeatService with Node
     val heartbeatTime = AccessibleExecutorConfiguration.ENGINECONN_HEARTBEAT_TIME.getValue.toLong
     Utils.defaultScheduler.scheduleAtFixedRate(new Runnable {
       override def run(): Unit = Utils.tryAndWarn {
-        if (EngineConnObject.isReady){
+        if (EngineConnObject.isReady) {
           val executor = ExecutorManager.getInstance.getReportExecutor
           reportHeartBeatMsg(executor)
         }
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
index f3140db74..1f71e198d 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
@@ -40,13 +40,11 @@ import scala.Option;
 import scala.Tuple2;
 
 public class QueryPersistenceManager extends PersistenceManager {
+    private static final Logger logger = LoggerFactory.getLogger(QueryPersistenceManager.class);
 
     private EntranceContext entranceContext;
     private PersistenceEngine persistenceEngine;
     private ResultSetEngine resultSetEngine;
-    private static final Logger logger = LoggerFactory.getLogger(QueryPersistenceManager.class);
-    //  private EntranceWebSocketService entranceWebSocketService; //TODO The latter version, to be
-    // removed, webSocket unified walk ListenerBus(后面的版本,要去掉,webSocket统一走ListenerBus)
 
     private CliHeartbeatMonitor cliHeartbeatMonitor;
 
@@ -66,13 +64,6 @@ public class QueryPersistenceManager extends PersistenceManager {
         this.resultSetEngine = resultSetEngine;
     }
 
-    // TODO The latter version, to be removed, webSocket unified walk
-    // ListenerBus(后面的版本,要去掉,webSocket统一走ListenerBus)
-    //    public void setEntranceWebSocketService(EntranceWebSocketService entranceWebSocketService)
-    // {
-    //        this.entranceWebSocketService = entranceWebSocketService;
-    //    }
-
     @Override
     public EntranceContext getEntranceContext() {
         return entranceContext;
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
index 83a386935..d3b0a11aa 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
@@ -18,6 +18,7 @@
 package org.apache.linkis.entrance.execute
 
 import org.apache.commons.io.IOUtils
+import org.apache.commons.lang3.StringUtils
 import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.linkis.common.log.LogUtils
 import org.apache.linkis.common.utils.{Logging, Utils}
@@ -181,7 +182,8 @@ class EngineExecuteAsyncReturn(val request: ExecuteRequest,
           case entranceExecuteRequest: EntranceExecuteRequest =>
             r match {
               case ErrorExecuteResponse(errorMsg, error) =>
-                val msg = s"Job with execId-$id + subJobId : $subJobId  execute failed,$errorMsg \n ${ExceptionUtils.getStackTrace(error)}"
+                val errorStackTrace = if (error != null) ExceptionUtils.getStackTrace(error) else StringUtils.EMPTY
+                val msg = s"Job with execId-$id + subJobId : $subJobId  execute failed,$errorMsg \n $errorStackTrace"
                 entranceExecuteRequest.getJob.getLogListener.foreach(_.onLogUpdate(entranceExecuteRequest.getJob, LogUtils.generateERROR(msg)))
               case _ =>
             }
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala
index 0bca10dad..397035d9a 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala
@@ -17,20 +17,18 @@
  
 package org.apache.linkis.manager.am.utils
 
-import java.{lang, util}
-
-import com.google.gson.{Gson, JsonObject}
+import com.google.gson.JsonObject
 import org.apache.linkis.manager.am.vo.{AMEngineNodeVo, EMNodeVo}
 import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
 import org.apache.linkis.manager.common.entity.node.{EMNode, EngineNode}
-import org.apache.linkis.manager.common.entity.resource.{DriverAndYarnResource, LoadInstanceResource, Resource, ResourceSerializer, ResourceType}
+import org.apache.linkis.manager.common.entity.resource.{DriverAndYarnResource, Resource, ResourceSerializer, ResourceType}
 import org.apache.linkis.manager.common.serializer.NodeResourceSerializer
-import org.apache.linkis.manager.common.utils.ResourceUtils
 import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel
 import org.apache.linkis.server.BDPJettyServerHelper
 import org.json4s.DefaultFormats
 import org.json4s.jackson.Serialization.write
 
+import java.util
 import scala.collection.JavaConverters._
 
 object AMUtils {
@@ -40,7 +38,7 @@ object AMUtils {
   implicit val formats = DefaultFormats + ResourceSerializer + NodeResourceSerializer
   val mapper = BDPJettyServerHelper.jacksonJson
 
-  def copyToEMVo(EMNodes: Array[EMNode]): util.ArrayList[EMNodeVo]= {
+  def copyToEMVo(EMNodes: Array[EMNode]): util.ArrayList[EMNodeVo] = {
     val EMNodeVos = new util.ArrayList[EMNodeVo]()
     EMNodes.foreach(node => {
       val EMNodeVo = new EMNodeVo
@@ -129,9 +127,9 @@ object AMUtils {
           if(node.getNodeTaskInfo.getSucceedTasks != null) AMEngineNodeVo.setSucceedTasks(node.getNodeTaskInfo.getSucceedTasks)
           if(node.getNodeTaskInfo.getFailedTasks != null) AMEngineNodeVo.setFailedTasks(node.getNodeTaskInfo.getFailedTasks)
         }
-        if(node.getNodeOverLoadInfo != null){
-          if(node.getNodeOverLoadInfo.getMaxMemory != null) AMEngineNodeVo.setMaxMemory(node.getNodeOverLoadInfo.getMaxMemory)
-          if(node.getNodeOverLoadInfo.getUsedMemory != null) AMEngineNodeVo.setUsedMemory(node.getNodeOverLoadInfo.getUsedMemory)
+        if (node.getNodeOverLoadInfo != null){
+          if (node.getNodeOverLoadInfo.getMaxMemory != null) AMEngineNodeVo.setMaxMemory(node.getNodeOverLoadInfo.getMaxMemory)
+          if (node.getNodeOverLoadInfo.getUsedMemory != null) AMEngineNodeVo.setUsedMemory(node.getNodeOverLoadInfo.getUsedMemory)
           if(node.getNodeOverLoadInfo.getSystemCPUUsed != null) AMEngineNodeVo.setSystemCPUUsed(node.getNodeOverLoadInfo.getSystemCPUUsed)
           if(node.getNodeOverLoadInfo.getSystemLeftMemory != null) AMEngineNodeVo.setSystemLeftMemory(node.getNodeOverLoadInfo.getSystemLeftMemory)
         }
@@ -145,14 +143,14 @@ object AMUtils {
     AMEngineNodeVos
   }
 
-  def createUnlimitedResource(): util.Map[String, Long] ={
+  def createUnlimitedResource(): util.Map[String, Long] = {
     val map = new util.HashMap[String,Long]()
     map.put("core", 128)
     map.put("memory", 512*1024*1024*1024)
     map.put("instance", 512)
     map
   }
-  def createZeroResource(): util.Map[String, Long] ={
+  def createZeroResource(): util.Map[String, Long] = {
     val map = new util.HashMap[String,Long]()
     map.put("core", 1)
     map.put("memory", 512*1024*1024)
@@ -161,10 +159,10 @@ object AMUtils {
   }
 
   def isJson(str: String): Boolean = {
-    try{
+    try {
       GSON.fromJson(str, classOf[JsonObject])
       true
-    }catch {
+    } catch {
       case _ => false
     }
   }
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala
index 8a5666c72..3eee3b8ac 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala
@@ -32,9 +32,10 @@ import org.apache.linkis.manager.rm.utils.RequestKerberosUrlUtils
 import org.json4s.JValue
 import org.json4s.JsonAST._
 import org.json4s.jackson.JsonMethods.parse
+
 import java.util
 import java.util.Base64
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.ConcurrentHashMap
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 
@@ -306,10 +307,11 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
     true
   }
 }
+
 object YarnResourceRequester extends Logging {
 
   private val httpClient = HttpClients.createDefault()
-  def init() = {
+  def init(): Unit = {
     addShutdownHook()
   }
 
diff --git a/linkis-engineconn-plugins/openlookeng/src/main/java/org/apache/linkis/engineplugin/openlookeng/executor/OpenLooKengEngineConnExecutor.java b/linkis-engineconn-plugins/openlookeng/src/main/java/org/apache/linkis/engineplugin/openlookeng/executor/OpenLooKengEngineConnExecutor.java
index 56617d149..30b74c20a 100644
--- a/linkis-engineconn-plugins/openlookeng/src/main/java/org/apache/linkis/engineplugin/openlookeng/executor/OpenLooKengEngineConnExecutor.java
+++ b/linkis-engineconn-plugins/openlookeng/src/main/java/org/apache/linkis/engineplugin/openlookeng/executor/OpenLooKengEngineConnExecutor.java
@@ -52,14 +52,30 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
 import okhttp3.OkHttpClient;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import io.prestosql.client.ClientSelectedRole;
+import io.prestosql.client.ClientSession;
+import io.prestosql.client.QueryError;
+import io.prestosql.client.QueryStatusInfo;
+import io.prestosql.client.SocketChannelSocketFactory;
+import io.prestosql.client.StatementClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import io.prestosql.client.*;
+import io.prestosql.client.StatementClientFactory;
 
 import java.io.IOException;
 import java.net.URI;
 import java.time.ZoneId;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.linkis.engineplugin.openlookeng.conf.OpenLooKengConfiguration.OPENLOOKENG_HTTP_CONNECT_TIME_OUT;
@@ -360,8 +376,11 @@ public class OpenLooKengEngineConnExecutor extends ConcurrentComputationExecutor
                 if (error.getFailureInfo() != null) {
                     cause = error.getFailureInfo().toException();
                 }
-                engineExecutorContext.appendStdout(
-                        LogUtils.generateERROR(ExceptionUtils.getStackTrace(cause)));
+                String errorString = "";
+                if (cause == null) {
+                    errorString = ExceptionUtils.getStackTrace(cause);
+                }
+                engineExecutorContext.appendStdout(LogUtils.generateERROR(errorString));
                 return new ErrorExecuteResponse(ExceptionUtils.getMessage(cause), cause);
             }
         } else if (statement.isClientAborted()) {
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/AbstractExecutionFactory.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/AbstractExecutionFactory.scala
index 497996252..2937bba6a 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/AbstractExecutionFactory.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/AbstractExecutionFactory.scala
@@ -21,7 +21,6 @@ import org.apache.linkis.common.utils.Utils
 import org.apache.linkis.orchestrator.conf.OrchestratorConfiguration
 import org.apache.linkis.orchestrator.core.SessionState
 import org.apache.linkis.orchestrator.execution.{Execution, ExecutionFactory, TaskManager, TaskScheduler}
-import org.apache.linkis.orchestrator.listener.OrchestratorListenerBusContext
 
 /**
   *
@@ -47,7 +46,7 @@ abstract class AbstractExecutionFactory extends ExecutionFactory {
   }
 
   override protected def getTaskScheduler(): TaskScheduler = {
-    val executorService = Utils.newCachedThreadPool(OrchestratorConfiguration.TASK_SCHEDULER_THREAD_POOL.getValue,  "BaseTaskScheduler-Thread-")
+    val executorService = Utils.newCachedThreadPool(OrchestratorConfiguration.TASK_SCHEDULER_THREAD_POOL.getValue, "BaseTaskScheduler-Thread-")
     new BaseTaskScheduler(executorService)
   }
 
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/BaseTaskScheduler.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/BaseTaskScheduler.scala
index 47f973c1b..30327b4ca 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/BaseTaskScheduler.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/BaseTaskScheduler.scala
@@ -17,19 +17,14 @@
  
 package org.apache.linkis.orchestrator.execution.impl
 
-import java.util
-import java.util.concurrent.{ExecutorService, Future, TimeUnit}
-
 import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.orchestrator.conf.OrchestratorConfiguration
-import org.apache.linkis.orchestrator.exception.OrchestratorRetryException
 import org.apache.linkis.orchestrator.execution.{ExecTaskRunner, TaskScheduler}
-import org.apache.linkis.orchestrator.listener.OrchestratorListenerBusContext
-import org.apache.linkis.orchestrator.listener.task.TaskReheaterEvent
-import org.apache.linkis.orchestrator.plans.physical.ExecTask
 
-import scala.collection.mutable.ArrayBuffer
+import java.util
+import java.util.concurrent.{ExecutorService, Future, TimeUnit}
 import scala.collection.convert.wrapAsScala._
+import scala.collection.mutable.ArrayBuffer
 
 /**
   *
@@ -78,17 +73,16 @@ class BaseTaskScheduler(executeService: ExecutorService) extends TaskScheduler w
     if (taskFutureCache.containsKey(task.task.getId)) {
       logger.info(s"from taskFutureCache to kill task Runner ${task.task.getIDInfo}")
       val future = taskFutureCache.get(task.task.getId)
-      if ( null != future && ! future.isDone) {
+      if (null != future && ! future.isDone) {
         future.cancel(interrupted)
       }
       taskFutureCache.remove(task.task.getId)
-      //taskIdTaskCache.remove(task.task.getId)
     }
   }
 
   override def close(): Unit = {
     taskFutureCache.foreach{ case (_, future) =>
-      if(future != null && !future.isDone) future.cancel(true)
+      if (future != null && !future.isDone) future.cancel(true)
     }
     taskFutureCache.clear()
    // taskIdTaskCache.clear()
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/hive/dto/MetadataQueryParam.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/hive/dto/MetadataQueryParam.java
index 0a3d9d71d..bee5ba61c 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/hive/dto/MetadataQueryParam.java
+++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/hive/dto/MetadataQueryParam.java
@@ -19,7 +19,7 @@ package org.apache.linkis.metadata.hive.dto;
 
 import org.apache.commons.lang3.StringUtils;
 
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
 
 public class MetadataQueryParam {
@@ -47,12 +47,12 @@ public class MetadataQueryParam {
     }
 
     public MetadataQueryParam() {
-        this.roles = Collections.emptyList();
+        this.roles = new ArrayList<>();
     }
 
     public MetadataQueryParam(String username) {
         this.userName = username;
-        this.roles = Collections.emptyList();
+        this.roles = new ArrayList<>();
     }
 
     public String getUserName() {
diff --git a/linkis-public-enhancements/linkis-publicservice/linkis-instance-label/linkis-instance-label-client/src/main/scala/org/apache/linkis/instance/label/client/InstanceLabelClient.scala b/linkis-public-enhancements/linkis-publicservice/linkis-instance-label/linkis-instance-label-client/src/main/scala/org/apache/linkis/instance/label/client/InstanceLabelClient.scala
index e461daffb..edc51e645 100644
--- a/linkis-public-enhancements/linkis-publicservice/linkis-instance-label/linkis-instance-label-client/src/main/scala/org/apache/linkis/instance/label/client/InstanceLabelClient.scala
+++ b/linkis-public-enhancements/linkis-publicservice/linkis-instance-label/linkis-instance-label-client/src/main/scala/org/apache/linkis/instance/label/client/InstanceLabelClient.scala
@@ -91,5 +91,5 @@ object InstanceLabelClient {
 
   private val instanceLabelClient = new InstanceLabelClient
 
-  def getInstance = instanceLabelClient
+  def getInstance: InstanceLabelClient = instanceLabelClient
 }
\ No newline at end of file
diff --git a/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala b/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
index edf868564..dcdb90ec5 100644
--- a/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
+++ b/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
@@ -40,8 +40,7 @@ import java.sql.Timestamp
 import java.{lang, util}
 import java.util.Date
 import java.util.concurrent.{Callable, TimeUnit}
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters.asScalaBufferConverter
+import scala.collection.JavaConverters._
 
 
 @Service
@@ -102,7 +101,7 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging {
         }
       }
       val jobUpdate = jobRequest2JobHistory(jobReq)
-      if(jobUpdate.getUpdatedTime == null) {
+      if (jobUpdate.getUpdatedTime == null) {
         throw new QueryException(120001,s"jobId:${jobReq.getId},更新job相关信息失败,请指定该请求的更新时间!")
       }
       logger.info(s"Update data to the database(往数据库中更新数据):task ${jobReq.getId} + status ${jobReq.getStatus}, updateTime: ${jobUpdate.getUpdateTimeMills}, progress : ${jobUpdate.getProgress}")
@@ -128,8 +127,8 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging {
     override def batchChange(jobReqUpdate: JobReqBatchUpdate): util.ArrayList[JobRespProtocol] = {
       val jobReqList = jobReqUpdate.jobReq
       val jobRespList = new util.ArrayList[JobRespProtocol]()
-      if(jobReqList != null){
-        jobReqList.foreach(jobReq =>{
+      if (jobReqList != null) {
+        jobReqList.asScala.foreach(jobReq => {
           jobReq.setExecutionCode(null)
           logger.info("Update data to the database(往数据库中更新数据):status:" + jobReq.getStatus )
           val jobResp = new JobRespProtocol
@@ -218,7 +217,7 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging {
 
   override def search(jobId: java.lang.Long, username: String, status: String, creator: String, sDate: Date, eDate: Date, engineType: String, startJobId: java.lang.Long): util.List[JobHistory] = {
 
-    val split: util.List[String] = if (status != null) status.split(",").toList else null
+    val split: util.List[String] = if (status != null) status.split(",").toList.asJava else null
     val result = if (StringUtils.isBlank(creator)) {
       jobHistoryMapper.search(jobId, username, split, sDate, eDate, engineType, startJobId)
     } else if (StringUtils.isBlank(username)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org