You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/09/03 06:12:11 UTC
[incubator-linkis] branch dev-1.3.1 updated: feat:Scala code format alarm clear in linkis-scheduler (#3178)
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
new 91b3316b2 feat:Scala code format alarm clear in linkis-scheduler (#3178)
91b3316b2 is described below
commit 91b3316b27433dc33b54e6afe592ecdb9664e84a
Author: ruY <43...@users.noreply.github.com>
AuthorDate: Sat Sep 3 14:12:06 2022 +0800
feat:Scala code format alarm clear in linkis-scheduler (#3178)
---
.../linkis/scheduler/AbstractScheduler.scala | 3 +-
.../scheduler/executer/AbstractExecutor.scala | 2 +-
.../linkis/scheduler/queue/AbstractGroup.scala | 10 +++----
.../org/apache/linkis/scheduler/queue/Job.scala | 27 +++++++++++-------
.../apache/linkis/scheduler/queue/LockJob.scala | 4 +--
.../linkis/scheduler/queue/LoopArrayQueue.scala | 9 +++---
.../linkis/scheduler/queue/SchedulerEvent.scala | 11 +++++---
.../apache/linkis/scheduler/queue/UserJob.scala | 8 +++---
.../scheduler/queue/fifoqueue/FIFOGroup.scala | 33 ++++++++++++----------
.../queue/fifoqueue/FIFOUserConsumer.scala | 20 +++++++------
.../parallelqueue/ParallelConsumerManager.scala | 17 +++++++----
11 files changed, 83 insertions(+), 61 deletions(-)
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala
index a05688ab8..c34185322 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala
@@ -53,11 +53,12 @@ abstract class AbstractScheduler extends Scheduler {
getSchedulerContext.getOrCreateConsumerManager.getOrCreateConsumer(group.getGroupName)
val index = consumer.getConsumeQueue.offer(event)
index.map(getEventId(_, group.getGroupName)).foreach(event.setId)
- if (index.isEmpty)
+ if (index.isEmpty) {
throw new SchedulerErrorException(
12001,
"The submission job failed and the queue is full!(提交作业失败,队列已满!)"
)
+ }
}
override def get(event: SchedulerEvent): Option[SchedulerEvent] = get(event.getId)
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/AbstractExecutor.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/AbstractExecutor.scala
index 9b2384dd1..b733c5156 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/AbstractExecutor.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/AbstractExecutor.scala
@@ -106,7 +106,7 @@ abstract class AbstractExecutor(id: Long) extends Executor with Logging {
override def getExecutorInfo: ExecutorInfo = ExecutorInfo(id, _state)
- def getLastActivityTime = lastActivityTime
+ def getLastActivityTime: Long = lastActivityTime
def setLastActivityTime(lastActivityTime: Long): Unit = this.lastActivityTime = lastActivityTime
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/AbstractGroup.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/AbstractGroup.scala
index 824e81ba4..6e9ecbd26 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/AbstractGroup.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/AbstractGroup.scala
@@ -25,14 +25,14 @@ abstract class AbstractGroup extends Group {
private var maxRunningJobs: Int = _
private var maxAskExecutorTimes: Long = 0L
- def setMaxRunningJobs(maxRunningJobs: Int) = this.maxRunningJobs = maxRunningJobs
- def getMaxRunningJobs = maxRunningJobs
+ def setMaxRunningJobs(maxRunningJobs: Int): Unit = this.maxRunningJobs = maxRunningJobs
+ def getMaxRunningJobs: Int = maxRunningJobs
- def setMaxAskExecutorTimes(maxAskExecutorTimes: Long) = this.maxAskExecutorTimes =
+ def setMaxAskExecutorTimes(maxAskExecutorTimes: Long): Unit = this.maxAskExecutorTimes =
maxAskExecutorTimes
- def getMaxAskExecutorTimes = maxAskExecutorTimes
+ def getMaxAskExecutorTimes: Long = maxAskExecutorTimes
override def getStatus: GroupStatus = _status
- def setStatus(status: GroupStatus) = this._status = status
+ def setStatus(status: GroupStatus): Unit = this._status = status
}
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala
index d708cbfb6..a746f9504 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala
@@ -70,7 +70,7 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
jobDaemon
}
- override def cancel() = kill()
+ override def cancel(): Unit = kill()
override def getId(): String = super.getId
@@ -219,11 +219,11 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
val state = getState
executeCompleted match {
case _: SuccessExecuteResponse =>
- if (!interrupt)
+ if (!interrupt) {
Utils.tryAndWarnMsg(transition(Succeed))(
s"update Job $toString from $state to Succeed failed."
)
- else transitionCompleted(errorExecuteResponse)
+ } else transitionCompleted(errorExecuteResponse)
case e: ErrorExecuteResponse =>
val canRetry = Utils.tryCatch(isJobShouldRetry(e)) { t =>
logger.error(s"Job $toString failed to get the retry information!", t)
@@ -269,8 +269,9 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
case _ => false
})
- final def isJobCanRetry: Boolean = if (!isJobSupportRetry || getState != WaitForRetry) false
- else
+ final def isJobCanRetry: Boolean = if (!isJobSupportRetry || getState != WaitForRetry) {
+ false
+ } else {
synchronized {
if (getState == WaitForRetry && (getMaxRetryNum < 1 || retryNum < getMaxRetryNum)) true
else if (WaitForRetry == getState && getMaxRetryNum > 0 && retryNum >= getMaxRetryNum) {
@@ -284,9 +285,11 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
false
} else false
}
+ }
- final def turnToRetry(): Boolean = if (!isJobSupportRetry || getState != WaitForRetry) false
- else
+ final def turnToRetry(): Boolean = if (!isJobSupportRetry || getState != WaitForRetry) {
+ false
+ } else {
synchronized(Utils.tryThrow {
if (isJobCanRetry) {
transition(Scheduled)
@@ -297,6 +300,7 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
retryNum += 1
t
})
+ }
override def run(): Unit = {
if (!isScheduled || interrupt) return
@@ -329,8 +333,9 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
case r: AsynReturnExecuteResponse =>
r.notify(r1 => {
val realRS =
- if (interrupt) errorExecuteResponse
- else
+ if (interrupt) {
+ errorExecuteResponse
+ } else {
r1 match {
case r: IncompleteExecuteResponse =>
ErrorExecuteResponse(
@@ -339,6 +344,7 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
)
case r: CompletedExecuteResponse => r
}
+ }
transitionCompleted(realRS)
})
}
@@ -398,10 +404,11 @@ class JobDaemon(job: Job, listenerUpdateIntervalMs: Long, executor: Executor)
lastProgress = progress
}
val log = Utils.tryAndWarnMsg(getLog)(s"Can not get logs from $executor for job $job.")
- if (StringUtils.isNotEmpty(log))
+ if (StringUtils.isNotEmpty(log)) {
Utils.tryAndWarnMsg(job.getLogListener.foreach(_.onLogUpdate(job, log)))(
s"Can not update logs for job $job."
)
+ }
Utils.tryQuietly(Thread.sleep(listenerUpdateIntervalMs))
}
}
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LockJob.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LockJob.scala
index 3a1fe36a9..c1defb3b9 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LockJob.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LockJob.scala
@@ -21,6 +21,6 @@ abstract class LockJob extends Job {
private var lock: String = _
- def setLock(lock: String) = this.lock = lock
- def getLock = lock
+ def setLock(lock: String): Unit = this.lock = lock
+ def getLock: String = lock
}
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala
index 94adf4859..b0bbfd3c2 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala
@@ -68,12 +68,13 @@ class LoopArrayQueue(var group: Group) extends ConsumeQueue with Logging {
var event: SchedulerEvent = null
eventQueue synchronized {
val _max = max
- if (index < realSize)
+ if (index < realSize) {
throw new IllegalArgumentException(
"The index " + index + " has already been deleted, now index must be better than " + realSize
)
- else if (index > _max)
+ } else if (index > _max) {
throw new IllegalArgumentException("The index " + index + " must be less than " + _max)
+ }
val _index = (flag + (index - realSize)) % maxCapacity
event = eventQueue(_index).asInstanceOf[SchedulerEvent]
}
@@ -86,9 +87,9 @@ class LoopArrayQueue(var group: Group) extends ConsumeQueue with Logging {
this.group = group
}
- def toIndexedSeq: IndexedSeq[SchedulerEvent] = if (filledSize == 0)
+ def toIndexedSeq: IndexedSeq[SchedulerEvent] = if (filledSize == 0) {
IndexedSeq.empty[SchedulerEvent]
- else eventQueue synchronized { (min to max).map(x => get(x).get).filter(x => x != None) }
+ } else eventQueue synchronized { (min to max).map(x => get(x).get).filter(x => x != None) }
def add(event: SchedulerEvent): Int = {
eventQueue synchronized {
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
index c60753a3f..007def269 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
@@ -37,7 +37,7 @@ trait SchedulerEvent extends Logging {
* To be compatible with old versions.
* It's not recommonded to use scheduledTime, which was only several mills at most time.
*/
- @Deprecated
+ @deprecated
def getScheduledTime: Long = scheduledTime
def getId: String = id
@@ -47,8 +47,9 @@ trait SchedulerEvent extends Logging {
this synchronized notify()
}
- def turnToScheduled(): Boolean = if (!isWaiting) false
- else
+ def turnToScheduled(): Boolean = if (!isWaiting) {
+ false
+ } else {
this synchronized {
if (!isWaiting) false
else {
@@ -58,6 +59,7 @@ trait SchedulerEvent extends Logging {
true
}
}
+ }
def pause(): Unit
def resume(): Unit
@@ -83,11 +85,12 @@ trait SchedulerEvent extends Logging {
def beforeStateChanged(fromState: SchedulerEventState, toState: SchedulerEventState): Unit = {}
protected def transition(state: SchedulerEventState): Unit = synchronized {
- if (state.id < this.state.id && state != WaitForRetry)
+ if (state.id < this.state.id && state != WaitForRetry) {
throw new SchedulerErrorException(
12000,
s"Task status flip error! Cause: Failed to flip from ${this.state} to $state.(任务状态翻转出错!原因:不允许从${this.state} 翻转为$state.)"
) // 抛异常
+ }
logger.info(s"$toString change status ${this.state} => $state.")
val oldState = this.state
this.state = state
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/UserJob.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/UserJob.scala
index 3733ad641..f7649c4a8 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/UserJob.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/UserJob.scala
@@ -19,13 +19,13 @@ package org.apache.linkis.scheduler.queue
case class UserJob() extends Job {
- override def init() = {}
+ override def init(): Unit = {}
override protected def jobToExecuteRequest = null
- override def getName = null
+ override def getName: String = null
- override def getJobInfo = null
+ override def getJobInfo: JobInfo = null
- override def close() = {}
+ override def close(): Unit = {}
}
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroup.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroup.scala
index 186318dfc..e2204eddf 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroup.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroup.scala
@@ -21,40 +21,43 @@ import org.apache.linkis.scheduler.queue.{AbstractGroup, SchedulerEvent}
import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration.{Duration, FiniteDuration}
class FIFOGroup(groupName: String, initCapacity: Int, maxCapacity: Int) extends AbstractGroup {
private var maxAskInterval = 30000L
private var minAskInterval = 10000L
- def getMaxAskInterval = maxAskInterval
- def setMaxAskInterval(maxAskInterval: Long) = this.maxAskInterval = maxAskInterval
- def getMinAskInterval = minAskInterval
- def setMinAskInterval(minAskInterval: Long) = this.minAskInterval = minAskInterval
+ def getMaxAskInterval: Long = maxAskInterval
+ def setMaxAskInterval(maxAskInterval: Long): Unit = this.maxAskInterval = maxAskInterval
+ def getMinAskInterval: Long = minAskInterval
+ def setMinAskInterval(minAskInterval: Long): Unit = this.minAskInterval = minAskInterval
- def getMaxAskExecutorDuration = if (getMaxAskExecutorTimes <= 0) Duration.Inf
- else Duration(getMaxAskExecutorTimes, TimeUnit.MILLISECONDS)
+ def getMaxAskExecutorDuration: Duration = if (getMaxAskExecutorTimes <= 0) {
+ Duration.Inf
+ } else {
+ Duration(getMaxAskExecutorTimes, TimeUnit.MILLISECONDS)
+ }
- def getAskExecutorInterval = if (getMaxAskExecutorTimes <= 0)
+ def getAskExecutorInterval: Duration = if (getMaxAskExecutorTimes <= 0) {
Duration(maxAskInterval, TimeUnit.MILLISECONDS)
- else if (getMaxAskExecutorTimes > maxAskInterval)
+ } else if (getMaxAskExecutorTimes > maxAskInterval) {
Duration(
math.min(math.max(getMaxAskExecutorTimes / 10, minAskInterval), maxAskInterval),
TimeUnit.MILLISECONDS
)
- else if (getMaxAskExecutorTimes > minAskInterval)
+ } else if (getMaxAskExecutorTimes > minAskInterval) {
Duration(minAskInterval, TimeUnit.MILLISECONDS)
- else Duration(getMaxAskExecutorTimes, TimeUnit.MILLISECONDS)
+ } else Duration(getMaxAskExecutorTimes, TimeUnit.MILLISECONDS)
- override def getGroupName = groupName
+ override def getGroupName: String = groupName
/**
* The percentage of waiting Jobs in the entire ConsumeQueue(等待的Job占整个ConsumeQueue的百分比)
*
* @return
*/
- override def getInitCapacity = initCapacity
+ override def getInitCapacity: Int = initCapacity
/**
* The waiting Job accounts for the largest percentage of the entire
@@ -62,6 +65,6 @@ class FIFOGroup(groupName: String, initCapacity: Int, maxCapacity: Int) extends
*
* @return
*/
- override def getMaximumCapacity = maxCapacity
- override def belongTo(event: SchedulerEvent) = true
+ override def getMaximumCapacity: Int = maxCapacity
+ override def belongTo(event: SchedulerEvent): Boolean = true
}
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
index abbf17a27..787c5aa06 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
@@ -57,19 +57,20 @@ class FIFOUserConsumer(
bdpFutureTask = new BDPFutureTask(this.future)
}
- override def setConsumeQueue(consumeQueue: ConsumeQueue) = {
+ override def setConsumeQueue(consumeQueue: ConsumeQueue): Unit = {
queue = consumeQueue
}
- override def getConsumeQueue = queue
+ override def getConsumeQueue: ConsumeQueue = queue
- override def getGroup = fifoGroup
+ override def getGroup: FIFOGroup = fifoGroup
- override def setGroup(group: Group) = {
+ override def setGroup(group: Group): Unit = {
this.fifoGroup = group.asInstanceOf[FIFOGroup]
}
- override def getRunningEvents = getEvents(e => e.isRunning || e.isWaitForRetry)
+ override def getRunningEvents: Array[SchedulerEvent] =
+ getEvents(e => e.isRunning || e.isWaitForRetry)
private def getEvents(op: SchedulerEvent => Boolean): Array[SchedulerEvent] = {
val result = ArrayBuffer[SchedulerEvent]()
@@ -77,7 +78,7 @@ class FIFOUserConsumer(
result.toArray
}
- override def run() = {
+ override def run(): Unit = {
Thread.currentThread().setName(s"${toString}Thread")
logger.info(s"$toString thread started!")
while (!terminate) {
@@ -121,8 +122,9 @@ class FIFOUserConsumer(
false
}
)
- ) takeEvent
- else getWaitForRetryEvent
+ ) {
+ takeEvent
+ } else getWaitForRetryEvent
}
}
event.foreach { case job: Job =>
@@ -185,7 +187,7 @@ class FIFOUserConsumer(
runningJobs(index) = job
}
- override def shutdown() = {
+ override def shutdown(): Unit = {
future.cancel(true)
super.shutdown()
}
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala
index 5ce672e64..d16fdbddd 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala
@@ -76,9 +76,9 @@ class ParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String)
this.consumerListener = Some(consumerListener)
}
- override def getOrCreateExecutorService: ExecutorService = if (executorService != null)
+ override def getOrCreateExecutorService: ExecutorService = if (executorService != null) {
executorService
- else
+ } else {
executorServiceLock.synchronized {
if (executorService == null) {
executorService = Utils.newCachedThreadPool(
@@ -89,14 +89,17 @@ class ParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String)
}
executorService
}
+ }
override def getOrCreateConsumer(groupName: String): Consumer = {
val consumer =
- if (consumerGroupMap.contains(groupName)) consumerGroupMap(groupName)
- else
+ if (consumerGroupMap.contains(groupName)) {
+ consumerGroupMap(groupName)
+ } else {
CONSUMER_LOCK.synchronized {
- if (consumerGroupMap.contains(groupName)) consumerGroupMap(groupName)
- else
+ if (consumerGroupMap.contains(groupName)) {
+ consumerGroupMap(groupName)
+ } else {
consumerGroupMap.getOrElseUpdate(
groupName, {
val newConsumer = createConsumer(groupName)
@@ -108,7 +111,9 @@ class ParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String)
newConsumer
}
)
+ }
}
+ }
consumer.setLastTime(System.currentTimeMillis())
consumer
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org