You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2019/11/25 03:44:51 UTC
[incubator-livy] branch master updated: [LIVY-714][SERVER] Fix
cannot remove the app in leakedAppTags when timeout
This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push:
new cccba94 [LIVY-714][SERVER] Fix cannot remove the app in leakedAppTags when timeout
cccba94 is described below
commit cccba9480e2db821d6cc67f580eeb67f2fac4e95
Author: runzhiwang <ru...@tencent.com>
AuthorDate: Mon Nov 25 11:44:37 2019 +0800
[LIVY-714][SERVER] Fix cannot remove the app in leakedAppTags when timeout
## What changes were proposed in this pull request?
1.`var isRemoved = false` should be in `while(iter.hasNext),` otherwise if there are two apps, the first app will be killApplication and the second app will timeout in this loop, and after removing the first app,` isRemoved = true`, and the second app cannot pass the` if(!isRemoved)` and only will be deleted in the next loop.
2.`entry.getValue - now` is negative, and never greater than `sessionLeakageCheckTimeout`.
![image](https://user-images.githubusercontent.com/51938049/69202431-99a81080-0b7c-11ea-8084-9801af5a75bd.png)
## How was this patch tested?
Existed IT and UT.
Author: runzhiwang <ru...@tencent.com>
Closes #259 from runzhiwang/leakapp.
---
.../scala/org/apache/livy/utils/SparkYarnApp.scala | 27 ++++++++++++++++------
.../org/apache/livy/utils/SparkYarnAppSpec.scala | 21 +++++++++++++++++
2 files changed, 41 insertions(+), 7 deletions(-)
diff --git a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
index 14af9fa..a245823 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
@@ -37,7 +37,8 @@ import org.apache.livy.{LivyConf, Logging, Utils}
object SparkYarnApp extends Logging {
- def init(livyConf: LivyConf): Unit = {
+ def init(livyConf: LivyConf, client: Option[YarnClient] = None): Unit = {
+ mockYarnClient = client
sessionLeakageCheckInterval = livyConf.getTimeAsMs(LivyConf.YARN_APP_LEAKAGE_CHECK_INTERVAL)
sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.YARN_APP_LEAKAGE_CHECK_TIMEOUT)
leakedAppsGCThread.setDaemon(true)
@@ -45,6 +46,8 @@ object SparkYarnApp extends Logging {
leakedAppsGCThread.start()
}
+ private var mockYarnClient: Option[YarnClient] = None
+
// YarnClient is thread safe. Create once, share it across threads.
lazy val yarnClient = {
val c = YarnClient.createYarnClient()
@@ -59,9 +62,9 @@ object SparkYarnApp extends Logging {
private def getYarnPollInterval(livyConf: LivyConf): FiniteDuration =
livyConf.getTimeAsMs(LivyConf.YARN_POLL_INTERVAL) milliseconds
- private val appType = Set("SPARK").asJava
+ private[utils] val appType = Set("SPARK").asJava
- private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]()
+ private[utils] val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]()
private var sessionLeakageCheckTimeout: Long = _
@@ -69,24 +72,34 @@ object SparkYarnApp extends Logging {
private val leakedAppsGCThread = new Thread() {
override def run(): Unit = {
+ val client = {
+ mockYarnClient match {
+ case Some(client) => client
+ case None => yarnClient
+ }
+ }
+
while (true) {
if (!leakedAppTags.isEmpty) {
// kill the app if found it and remove it if exceeding a threshold
val iter = leakedAppTags.entrySet().iterator()
- var isRemoved = false
val now = System.currentTimeMillis()
- val apps = yarnClient.getApplications(appType).asScala
+ val apps = client.getApplications(appType).asScala
+
while(iter.hasNext) {
+ var isRemoved = false
val entry = iter.next()
+
apps.find(_.getApplicationTags.contains(entry.getKey))
.foreach({ e =>
info(s"Kill leaked app ${e.getApplicationId}")
- yarnClient.killApplication(e.getApplicationId)
+ client.killApplication(e.getApplicationId)
iter.remove()
isRemoved = true
})
+
if (!isRemoved) {
- if ((entry.getValue - now) > sessionLeakageCheckTimeout) {
+ if ((now - entry.getValue) > sessionLeakageCheckTimeout) {
iter.remove()
info(s"Remove leaked yarn app tag ${entry.getKey}")
}
diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
index 064bb77..d43125d 100644
--- a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
+++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
@@ -16,6 +16,7 @@
*/
package org.apache.livy.utils
+import java.util.ArrayList
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
@@ -461,5 +462,25 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
}
}
}
+
+ it("should delete leak app when timeout") {
+ Clock.withSleepMethod(mockSleep) {
+ livyConf.set(LivyConf.YARN_APP_LEAKAGE_CHECK_INTERVAL, "100ms")
+ livyConf.set(LivyConf.YARN_APP_LEAKAGE_CHECK_TIMEOUT, "1000ms")
+
+ val client = mock[YarnClient]
+ when(client.getApplications(SparkYarnApp.appType)).
+ thenReturn(new ArrayList[ApplicationReport]())
+
+ SparkYarnApp.init(livyConf, Some(client))
+
+ SparkYarnApp.leakedAppTags.clear()
+ SparkYarnApp.leakedAppTags.put("leakApp", System.currentTimeMillis())
+
+ Eventually.eventually(Eventually.timeout(TEST_TIMEOUT), Eventually.interval(100 millis)) {
+ assert(SparkYarnApp.leakedAppTags.size() == 0)
+ }
+ }
+ }
}
}