You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2019/08/28 11:05:36 UTC

[incubator-livy] branch master updated: [LIVY-617] Livy session leak on Yarn when creating session duplicated names

This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ec3b9b  [LIVY-617] Livy session leak on Yarn when creating session duplicated names
4ec3b9b is described below

commit 4ec3b9b47b556390b2f738df62b5b277fa02f6ef
Author: Shanyu Zhao <sh...@microsoft.com>
AuthorDate: Wed Aug 28 19:04:57 2019 +0800

    [LIVY-617] Livy session leak on Yarn when creating session duplicated names
    
    ## What changes were proposed in this pull request?
    When creating a session with duplicated name, instead of throw exception in SessionManager.register() method, we should stop the session. Otherwise the session driver process will keep running and end up creating a leaked Yarn application.
    
    https://issues.apache.org/jira/browse/LIVY-617
    
    ## How was this patch tested?
    
    This is just a simple fix and verified with manual end to end test.
    
    Author: Shanyu Zhao <sh...@microsoft.com>
    
    Closes #187 from shanyu/shanyu.
---
 .../src/main/scala/org/apache/livy/sessions/SessionManager.scala | 9 ++++++++-
 server/src/test/scala/org/apache/livy/sessions/MockSession.scala | 5 ++++-
 .../test/scala/org/apache/livy/sessions/SessionManagerSpec.scala | 2 ++
 3 files changed, 14 insertions(+), 2 deletions(-)

diff --git a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
index f8f98a2..f2548ac 100644
--- a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
+++ b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
@@ -98,7 +98,10 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
     synchronized {
       session.name.foreach { sessionName =>
         if (sessionsByName.contains(sessionName)) {
-          throw new IllegalArgumentException(s"Duplicate session name: ${session.name}")
+          val errMsg = s"Duplicate session name: ${session.name} for session ${session.id}"
+          error(errMsg)
+          session.stop()
+          throw new IllegalArgumentException(errMsg)
         } else {
           sessionsByName.put(sessionName, session)
         }
@@ -106,6 +109,7 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
       sessions.put(session.id, session)
       session.start()
     }
+    info(s"Registered new session ${session.id}")
     session
   }
 
@@ -122,6 +126,7 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
   }
 
   def delete(session: S): Future[Unit] = {
+    info(s"Deleting session ${session.id}")
     session.stop().map { case _ =>
       try {
         sessionStore.remove(sessionType, session.id)
@@ -133,6 +138,8 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
         case NonFatal(e) =>
           error("Exception was thrown during stop session:", e)
           throw e
+      } finally {
+        info(s"Deleted session ${session.id}")
       }
     }
   }
diff --git a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
index ddcbd4b..f9609b1 100644
--- a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
+++ b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
@@ -27,7 +27,10 @@ class MockSession(id: Int, owner: String, conf: LivyConf, name: Option[String] =
 
   override def start(): Unit = ()
 
-  override protected def stopSession(): Unit = ()
+  var stopped = false
+  override protected def stopSession(): Unit = {
+    stopped = true
+  }
 
   override def logLines(): IndexedSeq[String] = IndexedSeq()
 
diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
index a5e9ffa..100c756 100644
--- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
+++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
@@ -92,6 +92,8 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit
       an[IllegalArgumentException] should be thrownBy manager.register(session2)
       manager.get(session1.id).isDefined should be(true)
       manager.get(session2.id).isDefined should be(false)
+      assert(!session1.stopped)
+      assert(session2.stopped)
       manager.shutdown()
     }