You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2019/08/15 03:53:43 UTC

[incubator-livy] branch master updated: [LIVY-620] Spark batch session always ends with success when configuration is master yarn and deploy-mode client

This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 01da43d  [LIVY-620] Spark batch session always ends with success when configuration is master yarn and deploy-mode client
01da43d is described below

commit 01da43dba07aee1e4d13a2a19f233a38546ddec0
Author: Gustavo Martin Morcuende <gu...@gmail.com>
AuthorDate: Thu Aug 15 11:53:20 2019 +0800

    [LIVY-620] Spark batch session always ends with success when configuration is master yarn and deploy-mode client
    
    ## What changes were proposed in this pull request?
    
    Batch session should end with state dead when process exits with no 0 return code.
    https://issues.apache.org/jira/browse/LIVY-620
    
    ## How was this patch tested?
    
    1. Unit Test (included in this PR)
    Submit batch session that runs forever, wait 2 seconds, kill that batch session and expect for state dead.
    
    2. Also currently used in production environment.
    
    Author: Gustavo Martin Morcuende <gu...@gmail.com>
    
    Closes #192 from gumartinm/master.
---
 .../apache/livy/server/batch/BatchSession.scala    | 11 ++++++-
 .../livy/server/batch/BatchSessionSpec.scala       | 38 +++++++++++++++++++++-
 2 files changed, 47 insertions(+), 2 deletions(-)

diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
index 4b27058..2a55c04 100644
--- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
@@ -28,7 +28,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties
 import org.apache.livy.{LivyConf, Logging, Utils}
 import org.apache.livy.server.AccessManager
 import org.apache.livy.server.recovery.SessionStore
-import org.apache.livy.sessions.{Session, SessionState}
+import org.apache.livy.sessions.{FinishedSessionState, Session, SessionState}
 import org.apache.livy.sessions.Session._
 import org.apache.livy.utils.{AppInfo, SparkApp, SparkAppListener, SparkProcessBuilder}
 
@@ -101,6 +101,7 @@ object BatchSession extends Logging {
             case 0 =>
             case exitCode =>
               warn(s"spark-submit exited with code $exitCode")
+              s.stateChanged(SparkApp.State.FAILED)
           }
         } finally {
           childProcesses.decrementAndGet()
@@ -182,6 +183,14 @@ class BatchSession(
   override def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = {
     synchronized {
       debug(s"$this state changed from $oldState to $newState")
+      if (!_state.isInstanceOf[FinishedSessionState]) {
+        stateChanged(newState)
+      }
+    }
+  }
+
+  private def stateChanged(newState: SparkApp.State): Unit = {
+    synchronized {
       newState match {
         case SparkApp.State.RUNNING =>
           _state = SessionState.Running
diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
index 417b627..20b8a81 100644
--- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
@@ -33,7 +33,7 @@ import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf, Utils}
 import org.apache.livy.server.AccessManager
 import org.apache.livy.server.recovery.SessionStore
 import org.apache.livy.sessions.SessionState
-import org.apache.livy.utils.{AppInfo, SparkApp}
+import org.apache.livy.utils.{AppInfo, Clock, SparkApp}
 
 class BatchSessionSpec
   extends FunSpec
@@ -56,6 +56,23 @@ class BatchSessionSpec
     script
   }
 
+  val runForeverScript: Path = {
+    val script = Files.createTempFile("livy-test-run-forever-script", ".py")
+    script.toFile.deleteOnExit()
+    val writer = new FileWriter(script.toFile)
+    try {
+      writer.write(
+        """
+          |import time
+          |while True:
+          | time.sleep(1)
+        """.stripMargin)
+    } finally {
+      writer.close()
+    }
+    script
+  }
+
   describe("A Batch process") {
     var sessionStore: SessionStore = null
 
@@ -102,6 +119,25 @@ class BatchSessionSpec
       batch.appInfo shouldEqual expectedAppInfo
     }
 
+    it("should end with status dead when batch session exits with no 0 return code") {
+      val req = new CreateBatchRequest()
+      req.file = runForeverScript.toString
+      req.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path"))
+
+      val conf = new LivyConf().set(LivyConf.LOCAL_FS_WHITELIST, sys.props("java.io.tmpdir"))
+      val accessManager = new AccessManager(conf)
+      val batch = BatchSession.create(0, None, req, conf, accessManager, null, None, sessionStore)
+      batch.start()
+      Clock.sleep(2)
+      batch.stopSession()
+
+      Utils.waitUntil({ () => !batch.state.isActive }, Duration(10, TimeUnit.SECONDS))
+      (batch.state match {
+        case SessionState.Dead(_) => true
+        case _ => false
+      }) should be (true)
+    }
+
     def testRecoverSession(name: Option[String]): Unit = {
       val conf = new LivyConf()
       val req = new CreateBatchRequest()