You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/07/28 07:45:34 UTC
[incubator-linkis] branch dev-1.2.0 updated: fix: fix the ExceptionUtils.getStackTrace NPE, and format some code style (#2536)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.2.0 by this push:
new d4cfbac12 fix: fix the ExceptionUtils.getStackTrace NPE, and format some code style (#2536)
d4cfbac12 is described below
commit d4cfbac12a30ff3dec2567eee24ebaf0de5c4187
Author: Jack Xu <xu...@126.com>
AuthorDate: Thu Jul 28 15:45:28 2022 +0800
fix: fix the ExceptionUtils.getStackTrace NPE, and format some code style (#2536)
* fix: fix the ExceptionUtils.getStackTrace NPE, add format some code sytle
---
.../org/apache/linkis/common/log/LogUtils.scala | 17 ++++++------
.../org/apache/linkis/common/utils/JsonUtils.scala | 2 --
.../scala/org/apache/linkis/server/Message.scala | 27 +++++++++----------
.../org/apache/linkis/rpc/BaseRPCSender.scala | 14 +++++-----
.../org/apache/linkis/rpc/RPCReceiveRestful.scala | 21 ++++++++-------
.../org/apache/linkis/rpc/RPCSpringBeanCache.scala | 13 +++++----
.../org/apache/linkis/rpc/ReceiverChooser.scala | 4 +--
.../SpringCloudFeignConfigurationCache.scala | 4 +--
.../linkis/rpc/sender/SpringMVCRPCSender.scala | 10 +++----
.../org/apache/linkis/scheduler/queue/Job.scala | 31 +++++++++++-----------
.../async/AsyncConcurrentComputationExecutor.scala | 5 +++-
.../service/DefaultExecutorHeartbeatService.scala | 2 +-
.../persistence/QueryPersistenceManager.java | 11 +-------
.../linkis/entrance/execute/EntranceExecutor.scala | 4 ++-
.../apache/linkis/manager/am/utils/AMUtils.scala | 24 ++++++++---------
.../rm/external/yarn/YarnResourceRequester.scala | 6 +++--
.../executor/OpenLooKengEngineConnExecutor.java | 27 ++++++++++++++++---
.../execution/impl/AbstractExecutionFactory.scala | 3 +--
.../execution/impl/BaseTaskScheduler.scala | 16 ++++-------
.../metadata/hive/dto/MetadataQueryParam.java | 6 ++---
.../label/client/InstanceLabelClient.scala | 2 +-
.../service/impl/JobHistoryQueryServiceImpl.scala | 11 ++++----
22 files changed, 134 insertions(+), 126 deletions(-)
diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/log/LogUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/log/LogUtils.scala
index 9b66291c5..0a776b0ef 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/log/LogUtils.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/log/LogUtils.scala
@@ -25,34 +25,33 @@ import java.util.Date
object LogUtils {
- private def getTimeFormat:String = {
- val simpleDateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.mmm")
+ private def getTimeFormat: String = {
+ val simpleDateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.mmm")
val now = new Date(System.currentTimeMillis())
simpleDateFormat.format(now)
- //now.toString(ISODateTimeFormat.yearMonthDay()) + " " + now.toString(ISODateTimeFormat.hourMinuteSecondMillis())
}
- def generateInfo(rawLog:String):String = {
+ def generateInfo(rawLog: String): String = {
getTimeFormat + " " + "INFO" + " " + rawLog
}
- def generateERROR(rawLog:String):String = {
+ def generateERROR(rawLog: String): String = {
getTimeFormat + " " + "ERROR" + " " + rawLog
}
- def generateWarn(rawLog:String):String = {
+ def generateWarn(rawLog: String): String = {
getTimeFormat + " " + "WARN" + " " + rawLog
}
- def generateSystemInfo(rawLog:String):String = {
+ def generateSystemInfo(rawLog: String): String = {
getTimeFormat + " " + "SYSTEM-INFO" + " " + rawLog
}
- def generateSystemError(rawLog:String):String = {
+ def generateSystemError(rawLog: String): String = {
getTimeFormat + " " + "SYSTEM-ERROR" + " " + rawLog
}
- def generateSystemWarn(rawLog:String):String = {
+ def generateSystemWarn(rawLog: String): String = {
getTimeFormat + " " + "SYSTEM-WARN" + " " + rawLog
}
diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/JsonUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/JsonUtils.scala
index 44a2d1f13..05ac7bd21 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/JsonUtils.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/JsonUtils.scala
@@ -24,8 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper
object JsonUtils {
- //TODO add gson
-
implicit val jackson = new ObjectMapper().setDateFormat(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ"))
}
diff --git a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Message.scala b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Message.scala
index c1f8686af..d603861f3 100644
--- a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Message.scala
+++ b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Message.scala
@@ -17,14 +17,13 @@
package org.apache.linkis.server
-import java.util
-import javax.servlet.http.HttpServletRequest
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
-import org.apache.linkis.common.utils.Logging
-
import org.springframework.web.context.request.{RequestContextHolder, ServletRequestAttributes}
+import java.util
+import javax.servlet.http.HttpServletRequest
+
class Message(private var method: String,
private var status: Int = 0, //-1 no login, 0 success, 1 error, 2 validate failed, 3 auth failed, 4 warning
@@ -59,13 +58,10 @@ class Message(private var method: String,
def setData(data: util.HashMap[String, Object]): Unit = this.data = data
def getData: util.HashMap[String, Object] = data
- // def isSuccess = status == 0
- // def isError = status != 0
-
override def toString: String = s"Message($getMethod, $getStatus, $getData)"
}
-object Message extends Logging {
+object Message {
def apply(method: String = null, status: Int = 0, message: String = null,
data: util.HashMap[String, Object] = new util.HashMap[String, Object]): Message = {
@@ -94,17 +90,21 @@ object Message extends Logging {
}
def error(msg: String): Message = error(msg, null)
implicit def error(t: Throwable): Message = {
- Message(status = 1).setMessage(ExceptionUtils.getRootCauseMessage(t)) << ("stack", ExceptionUtils.getStackTrace(t))
+ error(ExceptionUtils.getRootCauseMessage(t), t)
}
+
implicit def error(e: (String, Throwable)): Message = error(e._1, e._2)
implicit def error(msg: String, t: Throwable): Message = {
- val message = Message(status = 1)
+ error(msg, t, MessageStatus.ERROR)
+ }
+ implicit def error(msg: String, t: Throwable, status: Int): Message = {
+ val message = Message(status = status)
message.setMessage(msg)
if(t != null) message << ("stack", ExceptionUtils.getStackTrace(t))
message
}
implicit def warn(msg: String): Message = {
- val message = Message(status = 4)
+ val message = Message(status = MessageStatus.WARNING)
message.setMessage(msg)
message
}
@@ -121,10 +121,7 @@ object Message extends Logging {
}
def noLogin(msg: String, t: Throwable): Message = {
- val message = Message(status = -1)
- message.setMessage(msg)
- if(t != null) message << ("stack", ExceptionUtils.getStackTrace(t))
- message
+ error(msg, t, MessageStatus.NO_LOGIN)
}
def noLogin(msg: String): Message = noLogin(msg, null)
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala
index 94f3e2912..b41c397af 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala
@@ -67,7 +67,7 @@ private[rpc] class BaseRPCSender extends Sender with Logging {
protected def newRPC: RPCReceiveRemote = {
val builder = Feign.builder.logger(new Slf4jLogger()).logLevel(feign.Logger.Level.FULL)
doBuilder(builder)
- var url = if(name.startsWith("http://")) name else "http://" + name
+ var url = if (name.startsWith("http://")) name else "http://" + name
if(url.endsWith("/")) url = url.substring(0, url.length - 1)
url += ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue
builder.target(classOf[RPCReceiveRemote], url)
@@ -80,14 +80,14 @@ private[rpc] class BaseRPCSender extends Sender with Logging {
case _ => op
}
- override def ask(message: Any): Any = execute(message){
+ override def ask(message: Any): Any = execute(message) {
val msg = RPCProduct.getRPCProduct.toMessage(message)
BaseRPCSender.addInstanceInfo(msg.getData)
val response = getRPC.receiveAndReply(msg)
RPCConsumer.getRPCConsumer.toObject(response)
}
- override def ask(message: Any, timeout: Duration): Any = execute(message){
+ override def ask(message: Any, timeout: Duration): Any = execute(message) {
val msg = RPCProduct.getRPCProduct.toMessage(message)
msg.data("duration", timeout.toMillis)
BaseRPCSender.addInstanceInfo(msg.getData)
@@ -95,7 +95,7 @@ private[rpc] class BaseRPCSender extends Sender with Logging {
RPCConsumer.getRPCConsumer.toObject(response)
}
- private def sendIt(message: Any, op: Message => Message): Unit = execute(message){
+ private def sendIt(message: Any, op: Message => Message): Unit = execute(message) {
val msg = RPCProduct.getRPCProduct.toMessage(message)
BaseRPCSender.addInstanceInfo(msg.getData)
RPCConsumer.getRPCConsumer.toObject(op(msg)) match {
@@ -118,13 +118,13 @@ private[rpc] class BaseRPCSender extends Sender with Logging {
protected def getRPCSenderListenerBus = BaseRPCSender.rpcSenderListenerBus
- override def equals(obj: scala.Any): Boolean = if(obj == null) false
+ override def equals(obj: scala.Any): Boolean = if (obj == null) false
else obj match {
case sender: BaseRPCSender => name == sender.name
case _ => false
}
- override def hashCode(): Int = if(name == null) 0 else name.hashCode
+ override def hashCode(): Int = if (name == null) 0 else name.hashCode
override def toString: String = s"RPCSender($name)"
}
@@ -139,7 +139,7 @@ private[rpc] object BaseRPCSender extends Logging {
override def onMessageEventError(event: RPCMessageEvent, t: Throwable): Unit =
logger.warn(s"${event.serviceInstance} deliver RPC message failed! Message: " + event.message, t)
})
- def addInstanceInfo[T](map: util.Map[String, T]): Unit ={
+ def addInstanceInfo[T](map: util.Map[String, T]): Unit = {
map.put("name", DataWorkCloudApplication.getApplicationName.asInstanceOf[T])
map.put("instance", DataWorkCloudApplication.getInstance.asInstanceOf[T])
}
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRestful.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRestful.scala
index 05669ef13..2e166738d 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRestful.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCReceiveRestful.scala
@@ -64,14 +64,17 @@ private[rpc] class RPCReceiveRestful extends RPCReceiveRemote with Logging {
}
@PostConstruct
- def initListenerBus(): Unit = {
- if(!receiverChoosers.exists(_.isInstanceOf[CommonReceiverChooser]))
+ def initListenerBus(): Unit = {
+ if (!receiverChoosers.exists(_.isInstanceOf[CommonReceiverChooser])) {
receiverChoosers = receiverChoosers :+ new CommonReceiverChooser
- if(!receiverChoosers.exists(_.isInstanceOf[MessageReceiverChooser]))
+ }
+ if (!receiverChoosers.exists(_.isInstanceOf[MessageReceiverChooser])) {
receiverChoosers = receiverChoosers :+ new MessageReceiverChooser
+ }
logger.info("init all receiverChoosers in spring beans, list => " + receiverChoosers.toList)
- if(!receiverSenderBuilders.exists(_.isInstanceOf[CommonReceiverSenderBuilder]))
+ if (!receiverSenderBuilders.exists(_.isInstanceOf[CommonReceiverSenderBuilder])) {
receiverSenderBuilders = receiverSenderBuilders :+ new CommonReceiverSenderBuilder
+ }
receiverSenderBuilders = receiverSenderBuilders.sortBy(_.order)
logger.info("init all receiverSenderBuilders in spring beans, list => " + receiverSenderBuilders.toList)
val queueSize = BDP_RPC_RECEIVER_ASYN_QUEUE_CAPACITY.acquireNew
@@ -93,7 +96,7 @@ private[rpc] class RPCReceiveRestful extends RPCReceiveRemote with Logging {
rpcReceiverListenerBus.start()
}
- private def addBroadcastListener(broadcastListener: BroadcastListener): Unit = if(rpcReceiverListenerBus != null) {
+ private def addBroadcastListener(broadcastListener: BroadcastListener): Unit = if (rpcReceiverListenerBus != null) {
logger.info("add a new RPCBroadcastListener => " + broadcastListener.getClass)
rpcReceiverListenerBus.addListener(new RPCMessageEventListener {
val listenerName = broadcastListener.getClass.getSimpleName
@@ -114,7 +117,7 @@ private[rpc] class RPCReceiveRestful extends RPCReceiveRemote with Logging {
RPCProduct.getRPCProduct.toMessage(obj)
}
- @RequestMapping(path = Array("/rpc/receive"),method = Array(RequestMethod.POST))
+ @RequestMapping(path = Array("/rpc/receive"), method = Array(RequestMethod.POST))
override def receive(@RequestBody message: Message): Message = catchIt {
val obj = RPCConsumer.getRPCConsumer.toObject(message)
val event = RPCMessageEvent(obj, BaseRPCSender.getInstanceInfo(message.getData))
@@ -128,13 +131,13 @@ private[rpc] class RPCReceiveRestful extends RPCReceiveRemote with Logging {
event.map(opEvent(_, obj, event)).getOrElse(RPCProduct.getRPCProduct.notFound())
}
- @RequestMapping(path = Array("/rpc/receiveAndReply"),method = Array(RequestMethod.POST))
+ @RequestMapping(path = Array("/rpc/receiveAndReply"), method = Array(RequestMethod.POST))
override def receiveAndReply(@RequestBody message: Message): Message = receiveAndReply(message, _.receiveAndReply(_, _))
- @RequestMapping(path = Array("/rpc/replyInMills"),method = Array(RequestMethod.POST))
+ @RequestMapping(path = Array("/rpc/replyInMills"), method = Array(RequestMethod.POST))
override def receiveAndReplyInMills(@RequestBody message: Message): Message = catchIt {
val duration = message.getData.get("duration")
- if(duration == null || StringUtils.isEmpty(duration.toString)) throw new DWCURIException(10002, "The timeout period is not set!(超时时间未设置!)")
+ if (duration == null || StringUtils.isEmpty(duration.toString)) throw new DWCURIException(10002, "The timeout period is not set!(超时时间未设置!)")
val timeout = Duration(duration.toString.toLong, TimeUnit.MILLISECONDS)
receiveAndReply(message, _.receiveAndReply(_, timeout, _))
}
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala
index 32fea4cc8..e26a63d0c 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/RPCSpringBeanCache.scala
@@ -37,29 +37,32 @@ private[rpc] object RPCSpringBeanCache extends Logging {
private var rpcReceiveRestful: RPCReceiveRestful = _
def registerReceiver(receiverName: String, receiver: Receiver): Unit = {
- if(beanNameToReceivers == null) beanNameToReceivers = getApplicationContext.getBeansOfType(classOf[Receiver])
+ if (beanNameToReceivers == null) beanNameToReceivers = getApplicationContext.getBeansOfType(classOf[Receiver])
logger.info(s"register a new receiver with name $receiverName, receiver is " + receiver)
beanNameToReceivers synchronized beanNameToReceivers.put(receiverName, receiver)
}
def registerReceiverChooser(receiverChooser: ReceiverChooser): Unit = {
- if(rpcReceiveRestful == null)
+ if (rpcReceiveRestful == null) {
rpcReceiveRestful = getApplicationContext.getBean(classOf[RPCReceiveRestful])
+ }
rpcReceiveRestful.registerReceiverChooser(receiverChooser)
}
def registerBroadcastListener(broadcastListener: BroadcastListener): Unit = {
- if(rpcReceiveRestful == null)
+ if (rpcReceiveRestful == null) {
rpcReceiveRestful = getApplicationContext.getBean(classOf[RPCReceiveRestful])
+ }
rpcReceiveRestful.registerBroadcastListener(broadcastListener)
}
def getRPCReceiveRestful: RPCReceiveRestful = {
- if(rpcReceiveRestful == null)
+ if (rpcReceiveRestful == null) {
rpcReceiveRestful = getApplicationContext.getBean(classOf[RPCReceiveRestful])
+ }
rpcReceiveRestful
}
private[rpc] def getReceivers: util.Map[String, Receiver] = {
- if(beanNameToReceivers == null) beanNameToReceivers = getApplicationContext.getBeansOfType(classOf[Receiver])
+ if (beanNameToReceivers == null) beanNameToReceivers = getApplicationContext.getBeansOfType(classOf[Receiver])
beanNameToReceivers
}
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/ReceiverChooser.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/ReceiverChooser.scala
index 285df4f1d..1012e1367 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/ReceiverChooser.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/ReceiverChooser.scala
@@ -29,10 +29,10 @@ trait ReceiverChooser {
}
class CommonReceiverChooser extends ReceiverChooser {
- override def chooseReceiver(event: RPCMessageEvent): Option[Receiver] = if(getReceivers.size() == 1) Some(getReceivers.values.iterator.next)
+ override def chooseReceiver(event: RPCMessageEvent): Option[Receiver] = if (getReceivers.size() == 1) Some(getReceivers.values.iterator.next)
else {
var receiver = getReceivers.get(event.serviceInstance.getApplicationName)
- if(receiver == null) receiver = getReceivers.get("receiver")
+ if (receiver == null) receiver = getReceivers.get("receiver")
Option(receiver)
}
}
\ No newline at end of file
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringCloudFeignConfigurationCache.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringCloudFeignConfigurationCache.scala
index 45c4aee46..667dd52ff 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringCloudFeignConfigurationCache.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringCloudFeignConfigurationCache.scala
@@ -71,13 +71,13 @@ private[linkis] object SpringCloudFeignConfigurationCache {
private[rpc] def getDecoder = decoder
private[rpc] def getContract = contract
private[rpc] def getClient = {
- if(client == null) DataWorkCloudApplication.getApplicationContext.getBean(classOf[SpringCloudFeignConfigurationCache])
+ if (client == null) DataWorkCloudApplication.getApplicationContext.getBean(classOf[SpringCloudFeignConfigurationCache])
client
}
private[rpc] def getClientFactory = clientFactory
private[rpc] def getLoadBalancedRetryFactory = loadBalancedRetryFactory
private[linkis] def getDiscoveryClient = {
- if(discoveryClient == null) DataWorkCloudApplication.getApplicationContext.getBean(classOf[SpringCloudFeignConfigurationCache])
+ if (discoveryClient == null) DataWorkCloudApplication.getApplicationContext.getBean(classOf[SpringCloudFeignConfigurationCache])
discoveryClient
}
private[rpc] def getRPCTicketIdRequestInterceptor = rpcTicketIdRequestInterceptor
diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala
index d1db13a14..e5784d95f 100644
--- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala
+++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala
@@ -55,7 +55,7 @@ private[rpc] class SpringMVCRPCSender private[rpc](private[rpc] val serviceInsta
new FeignLoadBalancer(getClientFactory.getLoadBalancer(clientName), getClientFactory.getClientConfig(clientName), serverIntrospector) {
override def customizeLoadBalancerCommandBuilder(request: FeignLoadBalancer.RibbonRequest, config: IClientConfig,
builder: LoadBalancerCommand.Builder[FeignLoadBalancer.RibbonResponse]): Unit = {
- val instance = if(getRPCLoadBalancers.isEmpty) None else {
+ val instance = if (getRPCLoadBalancers.isEmpty) None else {
val requestBody = SpringMVCRPCSender.getRequest(request).body()
val requestStr = new String(requestBody, DWCConfiguration.BDP_ENCODING.getValue)
val obj = RPCConsumer.getRPCConsumer.toObject(BDPJettyServerHelper.gson.fromJson(requestStr, classOf[Message]))
@@ -97,7 +97,7 @@ private[rpc] class SpringMVCRPCSender private[rpc](private[rpc] val serviceInsta
*/
override def deliver(message: Any): Unit = getRPCSenderListenerBus.post(RPCMessageEvent(message, serviceInstance))
- override def equals(obj: Any): Boolean = if(obj == null) false
+ override def equals(obj: Any): Boolean = if (obj == null) false
else obj match {
case sender: SpringMVCRPCSender => sender.serviceInstance == serviceInstance
case _ => false
@@ -105,14 +105,14 @@ private[rpc] class SpringMVCRPCSender private[rpc](private[rpc] val serviceInsta
override def hashCode(): Int = serviceInstance.hashCode()
- override val toString: String = if(StringUtils.isBlank(serviceInstance.getInstance)) s"RPCSender(${serviceInstance.getApplicationName})"
+ override val toString: String = if (StringUtils.isBlank(serviceInstance.getInstance)) s"RPCSender(${serviceInstance.getApplicationName})"
else s"RPCSender($getApplicationName, ${serviceInstance.getInstance})"
}
private object SpringMVCRPCSender {
private var requestField: Field = _
def getRequest(req: ClientRequest): Request = {
- if(requestField == null) synchronized {
- if(requestField == null) {
+ if (requestField == null) synchronized {
+ if (requestField == null) {
requestField = req.getClass.getDeclaredField("request")
requestField.setAccessible(true)
}
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala
index b4e95f093..74d498760 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala
@@ -54,8 +54,9 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
private var retryNum = 0
private[linkis] var errorExecuteResponse: ErrorExecuteResponse = _
- override def isWaiting = super.isWaiting && !interrupt
- override def isCompleted = super.isCompleted || interrupt
+ override def isWaiting: Boolean = super.isWaiting && !interrupt
+
+ override def isCompleted: Boolean = super.isCompleted || interrupt
def kill(): Unit = onFailure("Job is killed by user!", null)
@@ -129,27 +130,27 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
}
- def setListenerEventBus(eventListenerBus: ListenerEventBus[_<: SchedulerListener, _<: ScheduleEvent]) = this.eventListenerBus = eventListenerBus
+ def setListenerEventBus(eventListenerBus: ListenerEventBus[_ <: SchedulerListener, _ <: ScheduleEvent]): Unit = this.eventListenerBus = eventListenerBus
- def setExecutor(executor: Executor) = this.executor = executor
+ def setExecutor(executor: Executor): Unit = this.executor = executor
protected def getExecutor = executor
- def setJobListener(jobListener: JobListener) = this.jobListener = Some(jobListener)
+ def setJobListener(jobListener: JobListener): Unit = this.jobListener = Some(jobListener)
- def getJobListener = jobListener
+ def getJobListener: Option[JobListener] = jobListener
- def setLogListener(logListener: LogListener) = this.logListener = Some(logListener)
+ def setLogListener(logListener: LogListener): Unit = this.logListener = Some(logListener)
- def getLogListener = logListener
+ def getLogListener: Option[LogListener] = logListener
- def setProgressListener(progressListener: ProgressListener) = this.progressListener = Some(progressListener)
+ def setProgressListener(progressListener: ProgressListener): Unit = this.progressListener = Some(progressListener)
- def getProgressListener = progressListener
+ def getProgressListener: Option[ProgressListener] = progressListener
- def getProgress = progress
+ def getProgress: Float = progress
- def setProgress(progress: Float) = this.progress = progress
+ def setProgress(progress: Float): Unit = this.progress = progress
@throws[Exception]
def init(): Unit
@@ -161,7 +162,7 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
def getJobInfo: JobInfo
- def getErrorResponse = errorExecuteResponse
+ def getErrorResponse: ErrorExecuteResponse = errorExecuteResponse
protected def existsJobDaemon: Boolean = false
@@ -223,7 +224,7 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
IOUtils.closeQuietly(this)
}
def isJobSupportRetry: Boolean = true
- def getRetryNum = retryNum
+ def getRetryNum: Int = retryNum
protected def getMaxRetryNum: Int = 2
protected def isJobShouldRetry(errorExecuteResponse: ErrorExecuteResponse): Boolean =
isJobSupportRetry && errorExecuteResponse != null && (errorExecuteResponse.t match {
@@ -261,7 +262,7 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
close()
return
}
- val rs = Utils.tryCatch(executor.execute(jobToExecuteRequest)){
+ val rs = Utils.tryCatch(executor.execute(jobToExecuteRequest)) {
case t: InterruptedException =>
logger.warn(s"job $toString is interrupted by user!", t)
ErrorExecuteResponse("job is interrupted by user!", t)
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncConcurrentComputationExecutor.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncConcurrentComputationExecutor.scala
index 7091d259b..67ac2d448 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncConcurrentComputationExecutor.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncConcurrentComputationExecutor.scala
@@ -99,7 +99,10 @@ abstract class AsyncConcurrentComputationExecutor(override val outputPrintLimit:
response match {
case e: ErrorExecuteResponse =>
logger.error("execute code failed!", e.t)
- engineExecutionContext.appendStdout(LogUtils.generateERROR(s"execute code failed!: ${ExceptionUtils.getStackTrace(e.t)}"))
+ val errorStr = if (e.t != null) {
+ ExceptionUtils.getStackTrace(e.t)
+ } else StringUtils.EMPTY
+ engineExecutionContext.appendStdout(LogUtils.generateERROR(s"execute code failed!: $errorStr"))
case SuccessExecuteResponse() =>
logger.info(s"task{${engineConnTask.getTaskId} execute success")
case e: OutputExecuteResponse =>
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala
index 837a3a0dd..0fdda710f 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala
@@ -58,7 +58,7 @@ class DefaultExecutorHeartbeatService extends ExecutorHeartbeatService with Node
val heartbeatTime = AccessibleExecutorConfiguration.ENGINECONN_HEARTBEAT_TIME.getValue.toLong
Utils.defaultScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryAndWarn {
- if (EngineConnObject.isReady){
+ if (EngineConnObject.isReady) {
val executor = ExecutorManager.getInstance.getReportExecutor
reportHeartBeatMsg(executor)
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
index f3140db74..1f71e198d 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
@@ -40,13 +40,11 @@ import scala.Option;
import scala.Tuple2;
public class QueryPersistenceManager extends PersistenceManager {
+ private static final Logger logger = LoggerFactory.getLogger(QueryPersistenceManager.class);
private EntranceContext entranceContext;
private PersistenceEngine persistenceEngine;
private ResultSetEngine resultSetEngine;
- private static final Logger logger = LoggerFactory.getLogger(QueryPersistenceManager.class);
- // private EntranceWebSocketService entranceWebSocketService; //TODO The latter version, to be
- // removed, webSocket unified walk ListenerBus(后面的版本,要去掉,webSocket统一走ListenerBus)
private CliHeartbeatMonitor cliHeartbeatMonitor;
@@ -66,13 +64,6 @@ public class QueryPersistenceManager extends PersistenceManager {
this.resultSetEngine = resultSetEngine;
}
- // TODO The latter version, to be removed, webSocket unified walk
- // ListenerBus(后面的版本,要去掉,webSocket统一走ListenerBus)
- // public void setEntranceWebSocketService(EntranceWebSocketService entranceWebSocketService)
- // {
- // this.entranceWebSocketService = entranceWebSocketService;
- // }
-
@Override
public EntranceContext getEntranceContext() {
return entranceContext;
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
index 83a386935..d3b0a11aa 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala
@@ -18,6 +18,7 @@
package org.apache.linkis.entrance.execute
import org.apache.commons.io.IOUtils
+import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{Logging, Utils}
@@ -181,7 +182,8 @@ class EngineExecuteAsyncReturn(val request: ExecuteRequest,
case entranceExecuteRequest: EntranceExecuteRequest =>
r match {
case ErrorExecuteResponse(errorMsg, error) =>
- val msg = s"Job with execId-$id + subJobId : $subJobId execute failed,$errorMsg \n ${ExceptionUtils.getStackTrace(error)}"
+ val errorStackTrace = if (error != null) ExceptionUtils.getStackTrace(error) else StringUtils.EMPTY
+ val msg = s"Job with execId-$id + subJobId : $subJobId execute failed,$errorMsg \n $errorStackTrace"
entranceExecuteRequest.getJob.getLogListener.foreach(_.onLogUpdate(entranceExecuteRequest.getJob, LogUtils.generateERROR(msg)))
case _ =>
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala
index 0bca10dad..397035d9a 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala
@@ -17,20 +17,18 @@
package org.apache.linkis.manager.am.utils
-import java.{lang, util}
-
-import com.google.gson.{Gson, JsonObject}
+import com.google.gson.JsonObject
import org.apache.linkis.manager.am.vo.{AMEngineNodeVo, EMNodeVo}
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.manager.common.entity.node.{EMNode, EngineNode}
-import org.apache.linkis.manager.common.entity.resource.{DriverAndYarnResource, LoadInstanceResource, Resource, ResourceSerializer, ResourceType}
+import org.apache.linkis.manager.common.entity.resource.{DriverAndYarnResource, Resource, ResourceSerializer, ResourceType}
import org.apache.linkis.manager.common.serializer.NodeResourceSerializer
-import org.apache.linkis.manager.common.utils.ResourceUtils
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel
import org.apache.linkis.server.BDPJettyServerHelper
import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization.write
+import java.util
import scala.collection.JavaConverters._
object AMUtils {
@@ -40,7 +38,7 @@ object AMUtils {
implicit val formats = DefaultFormats + ResourceSerializer + NodeResourceSerializer
val mapper = BDPJettyServerHelper.jacksonJson
- def copyToEMVo(EMNodes: Array[EMNode]): util.ArrayList[EMNodeVo]= {
+ def copyToEMVo(EMNodes: Array[EMNode]): util.ArrayList[EMNodeVo] = {
val EMNodeVos = new util.ArrayList[EMNodeVo]()
EMNodes.foreach(node => {
val EMNodeVo = new EMNodeVo
@@ -129,9 +127,9 @@ object AMUtils {
if(node.getNodeTaskInfo.getSucceedTasks != null) AMEngineNodeVo.setSucceedTasks(node.getNodeTaskInfo.getSucceedTasks)
if(node.getNodeTaskInfo.getFailedTasks != null) AMEngineNodeVo.setFailedTasks(node.getNodeTaskInfo.getFailedTasks)
}
- if(node.getNodeOverLoadInfo != null){
- if(node.getNodeOverLoadInfo.getMaxMemory != null) AMEngineNodeVo.setMaxMemory(node.getNodeOverLoadInfo.getMaxMemory)
- if(node.getNodeOverLoadInfo.getUsedMemory != null) AMEngineNodeVo.setUsedMemory(node.getNodeOverLoadInfo.getUsedMemory)
+ if (node.getNodeOverLoadInfo != null){
+ if (node.getNodeOverLoadInfo.getMaxMemory != null) AMEngineNodeVo.setMaxMemory(node.getNodeOverLoadInfo.getMaxMemory)
+ if (node.getNodeOverLoadInfo.getUsedMemory != null) AMEngineNodeVo.setUsedMemory(node.getNodeOverLoadInfo.getUsedMemory)
if(node.getNodeOverLoadInfo.getSystemCPUUsed != null) AMEngineNodeVo.setSystemCPUUsed(node.getNodeOverLoadInfo.getSystemCPUUsed)
if(node.getNodeOverLoadInfo.getSystemLeftMemory != null) AMEngineNodeVo.setSystemLeftMemory(node.getNodeOverLoadInfo.getSystemLeftMemory)
}
@@ -145,14 +143,14 @@ object AMUtils {
AMEngineNodeVos
}
- def createUnlimitedResource(): util.Map[String, Long] ={
+ def createUnlimitedResource(): util.Map[String, Long] = {
val map = new util.HashMap[String,Long]()
map.put("core", 128)
map.put("memory", 512*1024*1024*1024)
map.put("instance", 512)
map
}
- def createZeroResource(): util.Map[String, Long] ={
+ def createZeroResource(): util.Map[String, Long] = {
val map = new util.HashMap[String,Long]()
map.put("core", 1)
map.put("memory", 512*1024*1024)
@@ -161,10 +159,10 @@ object AMUtils {
}
def isJson(str: String): Boolean = {
- try{
+ try {
GSON.fromJson(str, classOf[JsonObject])
true
- }catch {
+ } catch {
case _ => false
}
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala
index 8a5666c72..3eee3b8ac 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala
@@ -32,9 +32,10 @@ import org.apache.linkis.manager.rm.utils.RequestKerberosUrlUtils
import org.json4s.JValue
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods.parse
+
import java.util
import java.util.Base64
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
@@ -306,10 +307,11 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
true
}
}
+
object YarnResourceRequester extends Logging {
private val httpClient = HttpClients.createDefault()
- def init() = {
+ def init(): Unit = {
addShutdownHook()
}
diff --git a/linkis-engineconn-plugins/openlookeng/src/main/java/org/apache/linkis/engineplugin/openlookeng/executor/OpenLooKengEngineConnExecutor.java b/linkis-engineconn-plugins/openlookeng/src/main/java/org/apache/linkis/engineplugin/openlookeng/executor/OpenLooKengEngineConnExecutor.java
index 56617d149..30b74c20a 100644
--- a/linkis-engineconn-plugins/openlookeng/src/main/java/org/apache/linkis/engineplugin/openlookeng/executor/OpenLooKengEngineConnExecutor.java
+++ b/linkis-engineconn-plugins/openlookeng/src/main/java/org/apache/linkis/engineplugin/openlookeng/executor/OpenLooKengEngineConnExecutor.java
@@ -52,14 +52,30 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
import okhttp3.OkHttpClient;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import io.prestosql.client.ClientSelectedRole;
+import io.prestosql.client.ClientSession;
+import io.prestosql.client.QueryError;
+import io.prestosql.client.QueryStatusInfo;
+import io.prestosql.client.SocketChannelSocketFactory;
+import io.prestosql.client.StatementClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.prestosql.client.*;
+import io.prestosql.client.StatementClientFactory;
import java.io.IOException;
import java.net.URI;
import java.time.ZoneId;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import static org.apache.linkis.engineplugin.openlookeng.conf.OpenLooKengConfiguration.OPENLOOKENG_HTTP_CONNECT_TIME_OUT;
@@ -360,8 +376,11 @@ public class OpenLooKengEngineConnExecutor extends ConcurrentComputationExecutor
if (error.getFailureInfo() != null) {
cause = error.getFailureInfo().toException();
}
- engineExecutorContext.appendStdout(
- LogUtils.generateERROR(ExceptionUtils.getStackTrace(cause)));
+ String errorString = "";
+ if (cause == null) {
+ errorString = ExceptionUtils.getStackTrace(cause);
+ }
+ engineExecutorContext.appendStdout(LogUtils.generateERROR(errorString));
return new ErrorExecuteResponse(ExceptionUtils.getMessage(cause), cause);
}
} else if (statement.isClientAborted()) {
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/AbstractExecutionFactory.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/AbstractExecutionFactory.scala
index 497996252..2937bba6a 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/AbstractExecutionFactory.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/AbstractExecutionFactory.scala
@@ -21,7 +21,6 @@ import org.apache.linkis.common.utils.Utils
import org.apache.linkis.orchestrator.conf.OrchestratorConfiguration
import org.apache.linkis.orchestrator.core.SessionState
import org.apache.linkis.orchestrator.execution.{Execution, ExecutionFactory, TaskManager, TaskScheduler}
-import org.apache.linkis.orchestrator.listener.OrchestratorListenerBusContext
/**
*
@@ -47,7 +46,7 @@ abstract class AbstractExecutionFactory extends ExecutionFactory {
}
override protected def getTaskScheduler(): TaskScheduler = {
- val executorService = Utils.newCachedThreadPool(OrchestratorConfiguration.TASK_SCHEDULER_THREAD_POOL.getValue, "BaseTaskScheduler-Thread-")
+ val executorService = Utils.newCachedThreadPool(OrchestratorConfiguration.TASK_SCHEDULER_THREAD_POOL.getValue, "BaseTaskScheduler-Thread-")
new BaseTaskScheduler(executorService)
}
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/BaseTaskScheduler.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/BaseTaskScheduler.scala
index 47f973c1b..30327b4ca 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/BaseTaskScheduler.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/BaseTaskScheduler.scala
@@ -17,19 +17,14 @@
package org.apache.linkis.orchestrator.execution.impl
-import java.util
-import java.util.concurrent.{ExecutorService, Future, TimeUnit}
-
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.orchestrator.conf.OrchestratorConfiguration
-import org.apache.linkis.orchestrator.exception.OrchestratorRetryException
import org.apache.linkis.orchestrator.execution.{ExecTaskRunner, TaskScheduler}
-import org.apache.linkis.orchestrator.listener.OrchestratorListenerBusContext
-import org.apache.linkis.orchestrator.listener.task.TaskReheaterEvent
-import org.apache.linkis.orchestrator.plans.physical.ExecTask
-import scala.collection.mutable.ArrayBuffer
+import java.util
+import java.util.concurrent.{ExecutorService, Future, TimeUnit}
import scala.collection.convert.wrapAsScala._
+import scala.collection.mutable.ArrayBuffer
/**
*
@@ -78,17 +73,16 @@ class BaseTaskScheduler(executeService: ExecutorService) extends TaskScheduler w
if (taskFutureCache.containsKey(task.task.getId)) {
logger.info(s"from taskFutureCache to kill task Runner ${task.task.getIDInfo}")
val future = taskFutureCache.get(task.task.getId)
- if ( null != future && ! future.isDone) {
+ if (null != future && ! future.isDone) {
future.cancel(interrupted)
}
taskFutureCache.remove(task.task.getId)
- //taskIdTaskCache.remove(task.task.getId)
}
}
override def close(): Unit = {
taskFutureCache.foreach{ case (_, future) =>
- if(future != null && !future.isDone) future.cancel(true)
+ if (future != null && !future.isDone) future.cancel(true)
}
taskFutureCache.clear()
// taskIdTaskCache.clear()
diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/hive/dto/MetadataQueryParam.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/hive/dto/MetadataQueryParam.java
index 0a3d9d71d..bee5ba61c 100644
--- a/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/hive/dto/MetadataQueryParam.java
+++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/java/org/apache/linkis/metadata/hive/dto/MetadataQueryParam.java
@@ -19,7 +19,7 @@ package org.apache.linkis.metadata.hive.dto;
import org.apache.commons.lang3.StringUtils;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.List;
public class MetadataQueryParam {
@@ -47,12 +47,12 @@ public class MetadataQueryParam {
}
public MetadataQueryParam() {
- this.roles = Collections.emptyList();
+ this.roles = new ArrayList<>();
}
public MetadataQueryParam(String username) {
this.userName = username;
- this.roles = Collections.emptyList();
+ this.roles = new ArrayList<>();
}
public String getUserName() {
diff --git a/linkis-public-enhancements/linkis-publicservice/linkis-instance-label/linkis-instance-label-client/src/main/scala/org/apache/linkis/instance/label/client/InstanceLabelClient.scala b/linkis-public-enhancements/linkis-publicservice/linkis-instance-label/linkis-instance-label-client/src/main/scala/org/apache/linkis/instance/label/client/InstanceLabelClient.scala
index e461daffb..edc51e645 100644
--- a/linkis-public-enhancements/linkis-publicservice/linkis-instance-label/linkis-instance-label-client/src/main/scala/org/apache/linkis/instance/label/client/InstanceLabelClient.scala
+++ b/linkis-public-enhancements/linkis-publicservice/linkis-instance-label/linkis-instance-label-client/src/main/scala/org/apache/linkis/instance/label/client/InstanceLabelClient.scala
@@ -91,5 +91,5 @@ object InstanceLabelClient {
private val instanceLabelClient = new InstanceLabelClient
- def getInstance = instanceLabelClient
+ def getInstance: InstanceLabelClient = instanceLabelClient
}
\ No newline at end of file
diff --git a/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala b/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
index edf868564..dcdb90ec5 100644
--- a/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
+++ b/linkis-public-enhancements/linkis-publicservice/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
@@ -40,8 +40,7 @@ import java.sql.Timestamp
import java.{lang, util}
import java.util.Date
import java.util.concurrent.{Callable, TimeUnit}
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters.asScalaBufferConverter
+import scala.collection.JavaConverters._
@Service
@@ -102,7 +101,7 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging {
}
}
val jobUpdate = jobRequest2JobHistory(jobReq)
- if(jobUpdate.getUpdatedTime == null) {
+ if (jobUpdate.getUpdatedTime == null) {
throw new QueryException(120001,s"jobId:${jobReq.getId},更新job相关信息失败,请指定该请求的更新时间!")
}
logger.info(s"Update data to the database(往数据库中更新数据):task ${jobReq.getId} + status ${jobReq.getStatus}, updateTime: ${jobUpdate.getUpdateTimeMills}, progress : ${jobUpdate.getProgress}")
@@ -128,8 +127,8 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging {
override def batchChange(jobReqUpdate: JobReqBatchUpdate): util.ArrayList[JobRespProtocol] = {
val jobReqList = jobReqUpdate.jobReq
val jobRespList = new util.ArrayList[JobRespProtocol]()
- if(jobReqList != null){
- jobReqList.foreach(jobReq =>{
+ if (jobReqList != null) {
+ jobReqList.asScala.foreach(jobReq => {
jobReq.setExecutionCode(null)
logger.info("Update data to the database(往数据库中更新数据):status:" + jobReq.getStatus )
val jobResp = new JobRespProtocol
@@ -218,7 +217,7 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging {
override def search(jobId: java.lang.Long, username: String, status: String, creator: String, sDate: Date, eDate: Date, engineType: String, startJobId: java.lang.Long): util.List[JobHistory] = {
- val split: util.List[String] = if (status != null) status.split(",").toList else null
+ val split: util.List[String] = if (status != null) status.split(",").toList.asJava else null
val result = if (StringUtils.isBlank(creator)) {
jobHistoryMapper.search(jobId, username, split, sDate, eDate, engineType, startJobId)
} else if (StringUtils.isBlank(username)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org