You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2019/08/27 03:56:33 UTC

[incubator-livy] branch master updated: [LIVY-620][LIVY-641] Fix travis failed on test: should end with status dead when batch session exits with no 0 return code

This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b92397  [LIVY-620][LIVY-641] Fix travis failed on test: should end with status dead when batch session exits with no 0 return code
8b92397 is described below

commit 8b92397d55c55615c7110cc7e2602d0106f6eac7
Author: runzhiwang <ru...@tencent.com>
AuthorDate: Tue Aug 27 11:56:00 2019 +0800

    [LIVY-620][LIVY-641] Fix travis failed on test: should end with status dead when batch session exits with no 0 return code
    
    ## What changes were proposed in this pull request?
    
    Fix travis failed on test: should end with status dead when batch session exits with no 0 return code
    
    1. Travis failed because of thread in SparkProcApp.scala and thead in BatchSession.scala change SessionState to different value when stopSession in test.
    
    2. The changes of BatchSession.scala is to revert the commit of https://github.com/apache/incubator-livy/commit/01da43dba07aee1e4d13a2a19f233a38546ddec0.
    
    3. The changes of SparkYarnApp.scala and BatchSessionSpec.scala are the new changes
    
    ## How was this patch tested?
    
    1. Existed UT and IT.
    2. Create Batch Session In Yarn and kill SparkSubmit, then check the SessionState.
    
    Author: runzhiwang <ru...@tencent.com>
    
    Closes #214 from runzhiwang/LIVY-641-SESSION-STATUS.
---
 .../main/scala/org/apache/livy/server/batch/BatchSession.scala |  9 ---------
 server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala | 10 ++++++++++
 .../scala/org/apache/livy/server/batch/BatchSessionSpec.scala  |  4 ++--
 3 files changed, 12 insertions(+), 11 deletions(-)

diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
index 2a55c04..c94fc04 100644
--- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
@@ -101,7 +101,6 @@ object BatchSession extends Logging {
             case 0 =>
             case exitCode =>
               warn(s"spark-submit exited with code $exitCode")
-              s.stateChanged(SparkApp.State.FAILED)
           }
         } finally {
           childProcesses.decrementAndGet()
@@ -183,14 +182,6 @@ class BatchSession(
   override def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = {
     synchronized {
       debug(s"$this state changed from $oldState to $newState")
-      if (!_state.isInstanceOf[FinishedSessionState]) {
-        stateChanged(newState)
-      }
-    }
-  }
-
-  private def stateChanged(newState: SparkApp.State): Unit = {
-    synchronized {
       newState match {
         case SparkApp.State.RUNNING =>
           _state = SessionState.Running
diff --git a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
index d255796..06d00a0 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
@@ -122,6 +122,7 @@ class SparkYarnApp private[utils] (
   with Logging {
   import SparkYarnApp._
 
+  private var killed = false
   private val appIdPromise: Promise[ApplicationId] = Promise()
   private[utils] var state: SparkApp.State = SparkApp.State.STARTING
   private var yarnDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String]
@@ -132,6 +133,7 @@ class SparkYarnApp private[utils] (
     ("\nYARN Diagnostics: " +: yarnDiagnostics)
 
   override def kill(): Unit = synchronized {
+    killed = true
     if (isRunning) {
       try {
         val timeout = SparkYarnApp.getYarnTagToAppIdTimeout(livyConf)
@@ -265,6 +267,14 @@ class SparkYarnApp private[utils] (
             appReport.getYarnApplicationState,
             appReport.getFinalApplicationStatus))
 
+          if (process.isDefined && !process.get.isAlive && process.get.exitValue() != 0) {
+            if (killed) {
+              changeState(SparkApp.State.KILLED)
+            } else {
+              changeState(SparkApp.State.FAILED)
+            }
+          }
+
           val latestAppInfo = {
             val attempt =
               yarnClient.getApplicationAttemptReport(appReport.getCurrentApplicationAttemptId)
diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
index 20b8a81..bc9ddc4 100644
--- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
@@ -119,7 +119,7 @@ class BatchSessionSpec
       batch.appInfo shouldEqual expectedAppInfo
     }
 
-    it("should end with status dead when batch session exits with no 0 return code") {
+    it("should end with status killed when batch session was stopped") {
       val req = new CreateBatchRequest()
       req.file = runForeverScript.toString
       req.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path"))
@@ -133,7 +133,7 @@ class BatchSessionSpec
 
       Utils.waitUntil({ () => !batch.state.isActive }, Duration(10, TimeUnit.SECONDS))
       (batch.state match {
-        case SessionState.Dead(_) => true
+        case SessionState.Killed(_) => true
         case _ => false
       }) should be (true)
     }