You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2019/08/15 03:53:43 UTC
[incubator-livy] branch master updated: [LIVY-620] Spark batch
session always ends with success when configuration is master yarn and
deploy-mode client
This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push:
new 01da43d [LIVY-620] Spark batch session always ends with success when configuration is master yarn and deploy-mode client
01da43d is described below
commit 01da43dba07aee1e4d13a2a19f233a38546ddec0
Author: Gustavo Martin Morcuende <gu...@gmail.com>
AuthorDate: Thu Aug 15 11:53:20 2019 +0800
[LIVY-620] Spark batch session always ends with success when configuration is master yarn and deploy-mode client
## What changes were proposed in this pull request?
Batch session should end with state dead when process exits with no 0 return code.
https://issues.apache.org/jira/browse/LIVY-620
## How was this patch tested?
1. Unit Test (included in this PR)
Submit batch session that runs forever, wait 2 seconds, kill that batch session and expect for state dead.
2. Also currently used in production environment.
Author: Gustavo Martin Morcuende <gu...@gmail.com>
Closes #192 from gumartinm/master.
---
.../apache/livy/server/batch/BatchSession.scala | 11 ++++++-
.../livy/server/batch/BatchSessionSpec.scala | 38 +++++++++++++++++++++-
2 files changed, 47 insertions(+), 2 deletions(-)
diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
index 4b27058..2a55c04 100644
--- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
@@ -28,7 +28,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import org.apache.livy.{LivyConf, Logging, Utils}
import org.apache.livy.server.AccessManager
import org.apache.livy.server.recovery.SessionStore
-import org.apache.livy.sessions.{Session, SessionState}
+import org.apache.livy.sessions.{FinishedSessionState, Session, SessionState}
import org.apache.livy.sessions.Session._
import org.apache.livy.utils.{AppInfo, SparkApp, SparkAppListener, SparkProcessBuilder}
@@ -101,6 +101,7 @@ object BatchSession extends Logging {
case 0 =>
case exitCode =>
warn(s"spark-submit exited with code $exitCode")
+ s.stateChanged(SparkApp.State.FAILED)
}
} finally {
childProcesses.decrementAndGet()
@@ -182,6 +183,14 @@ class BatchSession(
override def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = {
synchronized {
debug(s"$this state changed from $oldState to $newState")
+ if (!_state.isInstanceOf[FinishedSessionState]) {
+ stateChanged(newState)
+ }
+ }
+ }
+
+ private def stateChanged(newState: SparkApp.State): Unit = {
+ synchronized {
newState match {
case SparkApp.State.RUNNING =>
_state = SessionState.Running
diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
index 417b627..20b8a81 100644
--- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
@@ -33,7 +33,7 @@ import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf, Utils}
import org.apache.livy.server.AccessManager
import org.apache.livy.server.recovery.SessionStore
import org.apache.livy.sessions.SessionState
-import org.apache.livy.utils.{AppInfo, SparkApp}
+import org.apache.livy.utils.{AppInfo, Clock, SparkApp}
class BatchSessionSpec
extends FunSpec
@@ -56,6 +56,23 @@ class BatchSessionSpec
script
}
+ val runForeverScript: Path = {
+ val script = Files.createTempFile("livy-test-run-forever-script", ".py")
+ script.toFile.deleteOnExit()
+ val writer = new FileWriter(script.toFile)
+ try {
+ writer.write(
+ """
+ |import time
+ |while True:
+ | time.sleep(1)
+ """.stripMargin)
+ } finally {
+ writer.close()
+ }
+ script
+ }
+
describe("A Batch process") {
var sessionStore: SessionStore = null
@@ -102,6 +119,25 @@ class BatchSessionSpec
batch.appInfo shouldEqual expectedAppInfo
}
+ it("should end with status dead when batch session exits with no 0 return code") {
+ val req = new CreateBatchRequest()
+ req.file = runForeverScript.toString
+ req.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path"))
+
+ val conf = new LivyConf().set(LivyConf.LOCAL_FS_WHITELIST, sys.props("java.io.tmpdir"))
+ val accessManager = new AccessManager(conf)
+ val batch = BatchSession.create(0, None, req, conf, accessManager, null, None, sessionStore)
+ batch.start()
+ Clock.sleep(2)
+ batch.stopSession()
+
+ Utils.waitUntil({ () => !batch.state.isActive }, Duration(10, TimeUnit.SECONDS))
+ (batch.state match {
+ case SessionState.Dead(_) => true
+ case _ => false
+ }) should be (true)
+ }
+
def testRecoverSession(name: Option[String]): Unit = {
val conf = new LivyConf()
val req = new CreateBatchRequest()