You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@livy.apache.org by GitBox <gi...@apache.org> on 2019/12/06 02:48:13 UTC

[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354644854
 
 

 ##########
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##########
 @@ -56,20 +69,55 @@ object SparkYarnApp extends Logging {
     c
   }
 
-  private def getYarnTagToAppIdTimeout(livyConf: LivyConf): FiniteDuration =
-    livyConf.getTimeAsMs(LivyConf.YARN_APP_LOOKUP_TIMEOUT) milliseconds
-
-  private def getYarnPollInterval(livyConf: LivyConf): FiniteDuration =
-    livyConf.getTimeAsMs(LivyConf.YARN_POLL_INTERVAL) milliseconds
-
   private[utils] val appType = Set("SPARK").asJava
 
   private[utils] val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]()
 
+  private val monitorAppThreadMap = new java.util.concurrent.ConcurrentHashMap[Thread, Long]()
+
+  private val appQueue = new ConcurrentLinkedQueue[SparkYarnApp]()
+
   private var sessionLeakageCheckTimeout: Long = _
 
   private var sessionLeakageCheckInterval: Long = _
 
+  private var yarnAppMonitorThreadInterval: Long = _
+
+  private var yarnAppMonitorThreadBlockCheckInterval: Long = _
+
+  private var yarnAppMonitorThreadBlockTimeout: Long = _
+
+  private var yarnAppMonitorMaxFailedTimes: Long = _
+
+  private var yarnTagToAppIdMaxFailedTimes: Long = _
+
+  private val checkMonitorAppTimeoutThread = new Thread() {
+    override def run(): Unit = {
+      while (true) {
+        try {
+          val iter = monitorAppThreadMap.entrySet().iterator()
+          val now = System.currentTimeMillis()
+
+          while (iter.hasNext) {
+            val entry = iter.next()
+            val thread = entry.getKey
+            val updatedTime = entry.getValue
+
+            if (now - updatedTime - yarnAppMonitorThreadInterval >
+              yarnAppMonitorThreadBlockTimeout) {
+              thread.interrupt()
+            }
+          }
+
+          Thread.sleep(yarnAppMonitorThreadBlockCheckInterval)
+        } catch {
+          case e: InterruptedException =>
+            error(s"checkMonitorAppTimeoutThread Exception whiling monitor", e)
 
 Review comment:
   @jerryshao updated

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services