You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2019/11/25 03:44:51 UTC

[incubator-livy] branch master updated: [LIVY-714][SERVER] Fix cannot remove the app in leakedAppTags when timeout

This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new cccba94  [LIVY-714][SERVER] Fix cannot remove the app in leakedAppTags when timeout
cccba94 is described below

commit cccba9480e2db821d6cc67f580eeb67f2fac4e95
Author: runzhiwang <ru...@tencent.com>
AuthorDate: Mon Nov 25 11:44:37 2019 +0800

    [LIVY-714][SERVER] Fix cannot remove the app in leakedAppTags when timeout
    
    ## What changes were proposed in this pull request?
    1.`var isRemoved = false` should be in `while(iter.hasNext),` otherwise if there are two apps, the first app will be killApplication and the second app will timeout in this loop, and after removing the first app,` isRemoved = true`, and the second app cannot pass the` if(!isRemoved)` and only will be deleted in the next loop.
    
    2.`entry.getValue - now` is negative, and never greater than `sessionLeakageCheckTimeout`.
    
    ![image](https://user-images.githubusercontent.com/51938049/69202431-99a81080-0b7c-11ea-8084-9801af5a75bd.png)
    
    ## How was this patch tested?
    
    Existed IT and UT.
    
    Author: runzhiwang <ru...@tencent.com>
    
    Closes #259 from runzhiwang/leakapp.
---
 .../scala/org/apache/livy/utils/SparkYarnApp.scala | 27 ++++++++++++++++------
 .../org/apache/livy/utils/SparkYarnAppSpec.scala   | 21 +++++++++++++++++
 2 files changed, 41 insertions(+), 7 deletions(-)

diff --git a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
index 14af9fa..a245823 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
@@ -37,7 +37,8 @@ import org.apache.livy.{LivyConf, Logging, Utils}
 
 object SparkYarnApp extends Logging {
 
-  def init(livyConf: LivyConf): Unit = {
+  def init(livyConf: LivyConf, client: Option[YarnClient] = None): Unit = {
+    mockYarnClient = client
     sessionLeakageCheckInterval = livyConf.getTimeAsMs(LivyConf.YARN_APP_LEAKAGE_CHECK_INTERVAL)
     sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.YARN_APP_LEAKAGE_CHECK_TIMEOUT)
     leakedAppsGCThread.setDaemon(true)
@@ -45,6 +46,8 @@ object SparkYarnApp extends Logging {
     leakedAppsGCThread.start()
   }
 
+  private var mockYarnClient: Option[YarnClient] = None
+
   // YarnClient is thread safe. Create once, share it across threads.
   lazy val yarnClient = {
     val c = YarnClient.createYarnClient()
@@ -59,9 +62,9 @@ object SparkYarnApp extends Logging {
   private def getYarnPollInterval(livyConf: LivyConf): FiniteDuration =
     livyConf.getTimeAsMs(LivyConf.YARN_POLL_INTERVAL) milliseconds
 
-  private val appType = Set("SPARK").asJava
+  private[utils] val appType = Set("SPARK").asJava
 
-  private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]()
+  private[utils] val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]()
 
   private var sessionLeakageCheckTimeout: Long = _
 
@@ -69,24 +72,34 @@ object SparkYarnApp extends Logging {
 
   private val leakedAppsGCThread = new Thread() {
     override def run(): Unit = {
+      val client = {
+        mockYarnClient match {
+          case Some(client) => client
+          case None => yarnClient
+        }
+      }
+
       while (true) {
         if (!leakedAppTags.isEmpty) {
           // kill the app if found it and remove it if exceeding a threshold
           val iter = leakedAppTags.entrySet().iterator()
-          var isRemoved = false
           val now = System.currentTimeMillis()
-          val apps = yarnClient.getApplications(appType).asScala
+          val apps = client.getApplications(appType).asScala
+
           while(iter.hasNext) {
+            var isRemoved = false
             val entry = iter.next()
+
             apps.find(_.getApplicationTags.contains(entry.getKey))
               .foreach({ e =>
                 info(s"Kill leaked app ${e.getApplicationId}")
-                yarnClient.killApplication(e.getApplicationId)
+                client.killApplication(e.getApplicationId)
                 iter.remove()
                 isRemoved = true
               })
+
             if (!isRemoved) {
-              if ((entry.getValue - now) > sessionLeakageCheckTimeout) {
+              if ((now - entry.getValue) > sessionLeakageCheckTimeout) {
                 iter.remove()
                 info(s"Remove leaked yarn app tag ${entry.getKey}")
               }
diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
index 064bb77..d43125d 100644
--- a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
+++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.livy.utils
 
+import java.util.ArrayList
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 
@@ -461,5 +462,25 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
         }
       }
     }
+
+    it("should delete leak app when timeout") {
+      Clock.withSleepMethod(mockSleep) {
+        livyConf.set(LivyConf.YARN_APP_LEAKAGE_CHECK_INTERVAL, "100ms")
+        livyConf.set(LivyConf.YARN_APP_LEAKAGE_CHECK_TIMEOUT, "1000ms")
+
+        val client = mock[YarnClient]
+        when(client.getApplications(SparkYarnApp.appType)).
+          thenReturn(new ArrayList[ApplicationReport]())
+
+        SparkYarnApp.init(livyConf, Some(client))
+
+        SparkYarnApp.leakedAppTags.clear()
+        SparkYarnApp.leakedAppTags.put("leakApp", System.currentTimeMillis())
+
+        Eventually.eventually(Eventually.timeout(TEST_TIMEOUT), Eventually.interval(100 millis)) {
+          assert(SparkYarnApp.leakedAppTags.size() == 0)
+        }
+      }
+    }
   }
 }