You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/09/03 06:12:11 UTC

[incubator-linkis] branch dev-1.3.1 updated: feat:Scala code format alarm clear in linkis-scheduler (#3178)

This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
     new 91b3316b2 feat:Scala code format alarm clear in linkis-scheduler (#3178)
91b3316b2 is described below

commit 91b3316b27433dc33b54e6afe592ecdb9664e84a
Author: ruY <43...@users.noreply.github.com>
AuthorDate: Sat Sep 3 14:12:06 2022 +0800

    feat:Scala code format alarm clear in linkis-scheduler (#3178)
---
 .../linkis/scheduler/AbstractScheduler.scala       |  3 +-
 .../scheduler/executer/AbstractExecutor.scala      |  2 +-
 .../linkis/scheduler/queue/AbstractGroup.scala     | 10 +++----
 .../org/apache/linkis/scheduler/queue/Job.scala    | 27 +++++++++++-------
 .../apache/linkis/scheduler/queue/LockJob.scala    |  4 +--
 .../linkis/scheduler/queue/LoopArrayQueue.scala    |  9 +++---
 .../linkis/scheduler/queue/SchedulerEvent.scala    | 11 +++++---
 .../apache/linkis/scheduler/queue/UserJob.scala    |  8 +++---
 .../scheduler/queue/fifoqueue/FIFOGroup.scala      | 33 ++++++++++++----------
 .../queue/fifoqueue/FIFOUserConsumer.scala         | 20 +++++++------
 .../parallelqueue/ParallelConsumerManager.scala    | 17 +++++++----
 11 files changed, 83 insertions(+), 61 deletions(-)

diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala
index a05688ab8..c34185322 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala
@@ -53,11 +53,12 @@ abstract class AbstractScheduler extends Scheduler {
       getSchedulerContext.getOrCreateConsumerManager.getOrCreateConsumer(group.getGroupName)
     val index = consumer.getConsumeQueue.offer(event)
     index.map(getEventId(_, group.getGroupName)).foreach(event.setId)
-    if (index.isEmpty)
+    if (index.isEmpty) {
       throw new SchedulerErrorException(
         12001,
         "The submission job failed and the queue is full!(提交作业失败,队列已满!)"
       )
+    }
   }
 
   override def get(event: SchedulerEvent): Option[SchedulerEvent] = get(event.getId)
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/AbstractExecutor.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/AbstractExecutor.scala
index 9b2384dd1..b733c5156 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/AbstractExecutor.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/executer/AbstractExecutor.scala
@@ -106,7 +106,7 @@ abstract class AbstractExecutor(id: Long) extends Executor with Logging {
 
   override def getExecutorInfo: ExecutorInfo = ExecutorInfo(id, _state)
 
-  def getLastActivityTime = lastActivityTime
+  def getLastActivityTime: Long = lastActivityTime
 
   def setLastActivityTime(lastActivityTime: Long): Unit = this.lastActivityTime = lastActivityTime
 
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/AbstractGroup.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/AbstractGroup.scala
index 824e81ba4..6e9ecbd26 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/AbstractGroup.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/AbstractGroup.scala
@@ -25,14 +25,14 @@ abstract class AbstractGroup extends Group {
   private var maxRunningJobs: Int = _
   private var maxAskExecutorTimes: Long = 0L
 
-  def setMaxRunningJobs(maxRunningJobs: Int) = this.maxRunningJobs = maxRunningJobs
-  def getMaxRunningJobs = maxRunningJobs
+  def setMaxRunningJobs(maxRunningJobs: Int): Unit = this.maxRunningJobs = maxRunningJobs
+  def getMaxRunningJobs: Int = maxRunningJobs
 
-  def setMaxAskExecutorTimes(maxAskExecutorTimes: Long) = this.maxAskExecutorTimes =
+  def setMaxAskExecutorTimes(maxAskExecutorTimes: Long): Unit = this.maxAskExecutorTimes =
     maxAskExecutorTimes
 
-  def getMaxAskExecutorTimes = maxAskExecutorTimes
+  def getMaxAskExecutorTimes: Long = maxAskExecutorTimes
 
   override def getStatus: GroupStatus = _status
-  def setStatus(status: GroupStatus) = this._status = status
+  def setStatus(status: GroupStatus): Unit = this._status = status
 }
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala
index d708cbfb6..a746f9504 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala
@@ -70,7 +70,7 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
     jobDaemon
   }
 
-  override def cancel() = kill()
+  override def cancel(): Unit = kill()
 
   override def getId(): String = super.getId
 
@@ -219,11 +219,11 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
     val state = getState
     executeCompleted match {
       case _: SuccessExecuteResponse =>
-        if (!interrupt)
+        if (!interrupt) {
           Utils.tryAndWarnMsg(transition(Succeed))(
             s"update Job $toString from $state to Succeed failed."
           )
-        else transitionCompleted(errorExecuteResponse)
+        } else transitionCompleted(errorExecuteResponse)
       case e: ErrorExecuteResponse =>
         val canRetry = Utils.tryCatch(isJobShouldRetry(e)) { t =>
           logger.error(s"Job $toString failed to get the retry information!", t)
@@ -269,8 +269,9 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
       case _ => false
     })
 
-  final def isJobCanRetry: Boolean = if (!isJobSupportRetry || getState != WaitForRetry) false
-  else
+  final def isJobCanRetry: Boolean = if (!isJobSupportRetry || getState != WaitForRetry) {
+    false
+  } else {
     synchronized {
       if (getState == WaitForRetry && (getMaxRetryNum < 1 || retryNum < getMaxRetryNum)) true
       else if (WaitForRetry == getState && getMaxRetryNum > 0 && retryNum >= getMaxRetryNum) {
@@ -284,9 +285,11 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
         false
       } else false
     }
+  }
 
-  final def turnToRetry(): Boolean = if (!isJobSupportRetry || getState != WaitForRetry) false
-  else
+  final def turnToRetry(): Boolean = if (!isJobSupportRetry || getState != WaitForRetry) {
+    false
+  } else {
     synchronized(Utils.tryThrow {
       if (isJobCanRetry) {
         transition(Scheduled)
@@ -297,6 +300,7 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
       retryNum += 1
       t
     })
+  }
 
   override def run(): Unit = {
     if (!isScheduled || interrupt) return
@@ -329,8 +333,9 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
       case r: AsynReturnExecuteResponse =>
         r.notify(r1 => {
           val realRS =
-            if (interrupt) errorExecuteResponse
-            else
+            if (interrupt) {
+              errorExecuteResponse
+            } else {
               r1 match {
                 case r: IncompleteExecuteResponse =>
                   ErrorExecuteResponse(
@@ -339,6 +344,7 @@ abstract class Job extends Runnable with SchedulerEvent with Closeable with Logg
                   )
                 case r: CompletedExecuteResponse => r
               }
+            }
           transitionCompleted(realRS)
         })
     }
@@ -398,10 +404,11 @@ class JobDaemon(job: Job, listenerUpdateIntervalMs: Long, executor: Executor)
         lastProgress = progress
       }
       val log = Utils.tryAndWarnMsg(getLog)(s"Can not get logs from $executor for job $job.")
-      if (StringUtils.isNotEmpty(log))
+      if (StringUtils.isNotEmpty(log)) {
         Utils.tryAndWarnMsg(job.getLogListener.foreach(_.onLogUpdate(job, log)))(
           s"Can not update logs for job $job."
         )
+      }
       Utils.tryQuietly(Thread.sleep(listenerUpdateIntervalMs))
     }
   }
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LockJob.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LockJob.scala
index 3a1fe36a9..c1defb3b9 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LockJob.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LockJob.scala
@@ -21,6 +21,6 @@ abstract class LockJob extends Job {
 
   private var lock: String = _
 
-  def setLock(lock: String) = this.lock = lock
-  def getLock = lock
+  def setLock(lock: String): Unit = this.lock = lock
+  def getLock: String = lock
 }
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala
index 94adf4859..b0bbfd3c2 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala
@@ -68,12 +68,13 @@ class LoopArrayQueue(var group: Group) extends ConsumeQueue with Logging {
     var event: SchedulerEvent = null
     eventQueue synchronized {
       val _max = max
-      if (index < realSize)
+      if (index < realSize) {
         throw new IllegalArgumentException(
           "The index " + index + " has already been deleted, now index must be better than " + realSize
         )
-      else if (index > _max)
+      } else if (index > _max) {
         throw new IllegalArgumentException("The index " + index + " must be less than " + _max)
+      }
       val _index = (flag + (index - realSize)) % maxCapacity
       event = eventQueue(_index).asInstanceOf[SchedulerEvent]
     }
@@ -86,9 +87,9 @@ class LoopArrayQueue(var group: Group) extends ConsumeQueue with Logging {
     this.group = group
   }
 
-  def toIndexedSeq: IndexedSeq[SchedulerEvent] = if (filledSize == 0)
+  def toIndexedSeq: IndexedSeq[SchedulerEvent] = if (filledSize == 0) {
     IndexedSeq.empty[SchedulerEvent]
-  else eventQueue synchronized { (min to max).map(x => get(x).get).filter(x => x != None) }
+  } else eventQueue synchronized { (min to max).map(x => get(x).get).filter(x => x != None) }
 
   def add(event: SchedulerEvent): Int = {
     eventQueue synchronized {
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
index c60753a3f..007def269 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEvent.scala
@@ -37,7 +37,7 @@ trait SchedulerEvent extends Logging {
    * To be compatible with old versions.
    * It's not recommonded to use scheduledTime, which was only several mills at most time.
    */
-  @Deprecated
+  @deprecated
   def getScheduledTime: Long = scheduledTime
 
   def getId: String = id
@@ -47,8 +47,9 @@ trait SchedulerEvent extends Logging {
     this synchronized notify()
   }
 
-  def turnToScheduled(): Boolean = if (!isWaiting) false
-  else
+  def turnToScheduled(): Boolean = if (!isWaiting) {
+    false
+  } else {
     this synchronized {
       if (!isWaiting) false
       else {
@@ -58,6 +59,7 @@ trait SchedulerEvent extends Logging {
         true
       }
     }
+  }
 
   def pause(): Unit
   def resume(): Unit
@@ -83,11 +85,12 @@ trait SchedulerEvent extends Logging {
   def beforeStateChanged(fromState: SchedulerEventState, toState: SchedulerEventState): Unit = {}
 
   protected def transition(state: SchedulerEventState): Unit = synchronized {
-    if (state.id < this.state.id && state != WaitForRetry)
+    if (state.id < this.state.id && state != WaitForRetry) {
       throw new SchedulerErrorException(
         12000,
         s"Task status flip error! Cause: Failed to flip from ${this.state} to $state.(任务状态翻转出错!原因:不允许从${this.state} 翻转为$state.)"
       ) // 抛异常
+    }
     logger.info(s"$toString change status ${this.state} => $state.")
     val oldState = this.state
     this.state = state
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/UserJob.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/UserJob.scala
index 3733ad641..f7649c4a8 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/UserJob.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/UserJob.scala
@@ -19,13 +19,13 @@ package org.apache.linkis.scheduler.queue
 
 case class UserJob() extends Job {
 
-  override def init() = {}
+  override def init(): Unit = {}
 
   override protected def jobToExecuteRequest = null
 
-  override def getName = null
+  override def getName: String = null
 
-  override def getJobInfo = null
+  override def getJobInfo: JobInfo = null
 
-  override def close() = {}
+  override def close(): Unit = {}
 }
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroup.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroup.scala
index 186318dfc..e2204eddf 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroup.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroup.scala
@@ -21,40 +21,43 @@ import org.apache.linkis.scheduler.queue.{AbstractGroup, SchedulerEvent}
 
 import java.util.concurrent.TimeUnit
 
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration.{Duration, FiniteDuration}
 
 class FIFOGroup(groupName: String, initCapacity: Int, maxCapacity: Int) extends AbstractGroup {
 
   private var maxAskInterval = 30000L
   private var minAskInterval = 10000L
 
-  def getMaxAskInterval = maxAskInterval
-  def setMaxAskInterval(maxAskInterval: Long) = this.maxAskInterval = maxAskInterval
-  def getMinAskInterval = minAskInterval
-  def setMinAskInterval(minAskInterval: Long) = this.minAskInterval = minAskInterval
+  def getMaxAskInterval: Long = maxAskInterval
+  def setMaxAskInterval(maxAskInterval: Long): Unit = this.maxAskInterval = maxAskInterval
+  def getMinAskInterval: Long = minAskInterval
+  def setMinAskInterval(minAskInterval: Long): Unit = this.minAskInterval = minAskInterval
 
-  def getMaxAskExecutorDuration = if (getMaxAskExecutorTimes <= 0) Duration.Inf
-  else Duration(getMaxAskExecutorTimes, TimeUnit.MILLISECONDS)
+  def getMaxAskExecutorDuration: Duration = if (getMaxAskExecutorTimes <= 0) {
+    Duration.Inf
+  } else {
+    Duration(getMaxAskExecutorTimes, TimeUnit.MILLISECONDS)
+  }
 
-  def getAskExecutorInterval = if (getMaxAskExecutorTimes <= 0)
+  def getAskExecutorInterval: Duration = if (getMaxAskExecutorTimes <= 0) {
     Duration(maxAskInterval, TimeUnit.MILLISECONDS)
-  else if (getMaxAskExecutorTimes > maxAskInterval)
+  } else if (getMaxAskExecutorTimes > maxAskInterval) {
     Duration(
       math.min(math.max(getMaxAskExecutorTimes / 10, minAskInterval), maxAskInterval),
       TimeUnit.MILLISECONDS
     )
-  else if (getMaxAskExecutorTimes > minAskInterval)
+  } else if (getMaxAskExecutorTimes > minAskInterval) {
     Duration(minAskInterval, TimeUnit.MILLISECONDS)
-  else Duration(getMaxAskExecutorTimes, TimeUnit.MILLISECONDS)
+  } else Duration(getMaxAskExecutorTimes, TimeUnit.MILLISECONDS)
 
-  override def getGroupName = groupName
+  override def getGroupName: String = groupName
 
   /**
    * The percentage of waiting Jobs in the entire ConsumeQueue(等待的Job占整个ConsumeQueue的百分比)
    *
    * @return
    */
-  override def getInitCapacity = initCapacity
+  override def getInitCapacity: Int = initCapacity
 
   /**
    * The waiting Job accounts for the largest percentage of the entire
@@ -62,6 +65,6 @@ class FIFOGroup(groupName: String, initCapacity: Int, maxCapacity: Int) extends
    *
    * @return
    */
-  override def getMaximumCapacity = maxCapacity
-  override def belongTo(event: SchedulerEvent) = true
+  override def getMaximumCapacity: Int = maxCapacity
+  override def belongTo(event: SchedulerEvent): Boolean = true
 }
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
index abbf17a27..787c5aa06 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
@@ -57,19 +57,20 @@ class FIFOUserConsumer(
     bdpFutureTask = new BDPFutureTask(this.future)
   }
 
-  override def setConsumeQueue(consumeQueue: ConsumeQueue) = {
+  override def setConsumeQueue(consumeQueue: ConsumeQueue): Unit = {
     queue = consumeQueue
   }
 
-  override def getConsumeQueue = queue
+  override def getConsumeQueue: ConsumeQueue = queue
 
-  override def getGroup = fifoGroup
+  override def getGroup: FIFOGroup = fifoGroup
 
-  override def setGroup(group: Group) = {
+  override def setGroup(group: Group): Unit = {
     this.fifoGroup = group.asInstanceOf[FIFOGroup]
   }
 
-  override def getRunningEvents = getEvents(e => e.isRunning || e.isWaitForRetry)
+  override def getRunningEvents: Array[SchedulerEvent] =
+    getEvents(e => e.isRunning || e.isWaitForRetry)
 
   private def getEvents(op: SchedulerEvent => Boolean): Array[SchedulerEvent] = {
     val result = ArrayBuffer[SchedulerEvent]()
@@ -77,7 +78,7 @@ class FIFOUserConsumer(
     result.toArray
   }
 
-  override def run() = {
+  override def run(): Unit = {
     Thread.currentThread().setName(s"${toString}Thread")
     logger.info(s"$toString thread started!")
     while (!terminate) {
@@ -121,8 +122,9 @@ class FIFOUserConsumer(
                   false
                 }
               )
-          ) takeEvent
-          else getWaitForRetryEvent
+          ) {
+            takeEvent
+          } else getWaitForRetryEvent
       }
     }
     event.foreach { case job: Job =>
@@ -185,7 +187,7 @@ class FIFOUserConsumer(
     runningJobs(index) = job
   }
 
-  override def shutdown() = {
+  override def shutdown(): Unit = {
     future.cancel(true)
     super.shutdown()
   }
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala
index 5ce672e64..d16fdbddd 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala
@@ -76,9 +76,9 @@ class ParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String)
     this.consumerListener = Some(consumerListener)
   }
 
-  override def getOrCreateExecutorService: ExecutorService = if (executorService != null)
+  override def getOrCreateExecutorService: ExecutorService = if (executorService != null) {
     executorService
-  else
+  } else {
     executorServiceLock.synchronized {
       if (executorService == null) {
         executorService = Utils.newCachedThreadPool(
@@ -89,14 +89,17 @@ class ParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String)
       }
       executorService
     }
+  }
 
   override def getOrCreateConsumer(groupName: String): Consumer = {
     val consumer =
-      if (consumerGroupMap.contains(groupName)) consumerGroupMap(groupName)
-      else
+      if (consumerGroupMap.contains(groupName)) {
+        consumerGroupMap(groupName)
+      } else {
         CONSUMER_LOCK.synchronized {
-          if (consumerGroupMap.contains(groupName)) consumerGroupMap(groupName)
-          else
+          if (consumerGroupMap.contains(groupName)) {
+            consumerGroupMap(groupName)
+          } else {
             consumerGroupMap.getOrElseUpdate(
               groupName, {
                 val newConsumer = createConsumer(groupName)
@@ -108,7 +111,9 @@ class ParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String)
                 newConsumer
               }
             )
+          }
         }
+      }
     consumer.setLastTime(System.currentTimeMillis())
     consumer
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org