You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/09/18 08:36:14 UTC

incubator-livy git commit: [LIVY-396] Livy does not map YARN app states correctly

Repository: incubator-livy
Updated Branches:
  refs/heads/master 52c89a9c4 -> 219fdac5c


[LIVY-396] Livy does not map YARN app states correctly

Make sure that the invalid combinations of `state` and `finalStatus` do
not occur, and if they do, put the application in the `FAILED` state.

Task-Url: https://issues.apache.org/jira/browse/LIVY-396

Author: Fathi Salmi, Meisam(mfathisalmi) <mf...@paypal.com>

Closes #39 from meisam/LIVY-396.

Change-Id: I839ef9f617cb08463f37eff803dbb99d681f852a


Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/219fdac5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/219fdac5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/219fdac5

Branch: refs/heads/master
Commit: 219fdac5cfea1780cfb4e52f29777d3b21f8a55e
Parents: 52c89a9
Author: Fathi Salmi, Meisam(mfathisalmi) <mf...@paypal.com>
Authored: Mon Sep 18 16:36:03 2017 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Mon Sep 18 16:36:03 2017 +0800

----------------------------------------------------------------------
 .../org/apache/livy/utils/SparkYarnApp.scala    | 34 ++++-----
 .../apache/livy/utils/SparkYarnAppSpec.scala    | 78 +++++++++++++++-----
 2 files changed, 78 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/219fdac5/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
index 0ab6c44..804be54 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
@@ -206,23 +206,23 @@ class SparkYarnApp private[utils] (
       appId: ApplicationId,
       yarnAppState: YarnApplicationState,
       finalAppStatus: FinalApplicationStatus): SparkApp.State.Value = {
-    yarnAppState match {
-      case (YarnApplicationState.NEW |
-            YarnApplicationState.NEW_SAVING |
-            YarnApplicationState.SUBMITTED |
-            YarnApplicationState.ACCEPTED) => SparkApp.State.STARTING
-      case YarnApplicationState.RUNNING => SparkApp.State.RUNNING
-      case YarnApplicationState.FINISHED =>
-        finalAppStatus match {
-          case FinalApplicationStatus.SUCCEEDED => SparkApp.State.FINISHED
-          case FinalApplicationStatus.FAILED => SparkApp.State.FAILED
-          case FinalApplicationStatus.KILLED => SparkApp.State.KILLED
-          case s =>
-            error(s"Unknown YARN final status $appId $s")
-            SparkApp.State.FAILED
-        }
-      case YarnApplicationState.FAILED => SparkApp.State.FAILED
-      case YarnApplicationState.KILLED => SparkApp.State.KILLED
+    (yarnAppState, finalAppStatus) match {
+      case (YarnApplicationState.NEW, FinalApplicationStatus.UNDEFINED) |
+           (YarnApplicationState.NEW_SAVING, FinalApplicationStatus.UNDEFINED) |
+           (YarnApplicationState.SUBMITTED, FinalApplicationStatus.UNDEFINED) |
+           (YarnApplicationState.ACCEPTED, FinalApplicationStatus.UNDEFINED) =>
+        SparkApp.State.STARTING
+      case (YarnApplicationState.RUNNING, FinalApplicationStatus.UNDEFINED) =>
+        SparkApp.State.RUNNING
+      case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) =>
+        SparkApp.State.FINISHED
+      case (YarnApplicationState.FAILED, FinalApplicationStatus.FAILED) =>
+        SparkApp.State.FAILED
+      case (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED) =>
+        SparkApp.State.KILLED
+      case (state, finalStatus) => // any other combination is invalid, so FAIL the application.
+        error(s"Unknown YARN state $state for app $appId with final status $finalStatus.")
+        SparkApp.State.FAILED
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/219fdac5/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
index b3c50da..b747a9a 100644
--- a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
+++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
@@ -17,6 +17,7 @@
 package org.apache.livy.utils
 
 import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 
 import scala.concurrent.duration._
 import scala.language.postfixOps
@@ -60,19 +61,46 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
 
         val mockAppReport = mock[ApplicationReport]
         when(mockAppReport.getApplicationId).thenReturn(appId)
-        when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
-        // Simulate YARN app state progression.
-        when(mockAppReport.getYarnApplicationState).thenAnswer(new Answer[YarnApplicationState]() {
-          private var stateSeq = List(ACCEPTED, RUNNING, FINISHED)
 
-          override def answer(invocation: InvocationOnMock): YarnApplicationState = {
-            val currentState = stateSeq.head
-            if (stateSeq.tail.nonEmpty) {
-              stateSeq = stateSeq.tail
+        // Simulate YARN app state progression.
+        val applicationStateList = List(
+          ACCEPTED,
+          RUNNING,
+          FINISHED
+        )
+        val finalApplicationStatusList = List(
+          FinalApplicationStatus.UNDEFINED,
+          FinalApplicationStatus.UNDEFINED,
+          FinalApplicationStatus.SUCCEEDED
+       )
+        val stateIndex = new AtomicInteger(-1)
+        when(mockAppReport.getYarnApplicationState).thenAnswer(
+          // get and increment
+          new Answer[YarnApplicationState] {
+            override def answer(invocationOnMock: InvocationOnMock): YarnApplicationState = {
+              stateIndex.incrementAndGet match {
+                case i if i < applicationStateList.size =>
+                  applicationStateList(i)
+                case _ =>
+                  applicationStateList.last
+              }
             }
-            currentState
           }
-        })
+        )
+        when(mockAppReport.getFinalApplicationStatus).thenAnswer(
+          new Answer[FinalApplicationStatus] {
+            override def answer(invocationOnMock: InvocationOnMock): FinalApplicationStatus = {
+              // do not increment here, only get
+              stateIndex.get match {
+                case i if i < applicationStateList.size =>
+                  finalApplicationStatusList(i)
+                case _ =>
+                  finalApplicationStatusList.last
+              }
+            }
+          }
+        )
+
         when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
 
         val app = new SparkYarnApp(
@@ -210,11 +238,17 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
         assert(app.mapYarnState(appId, RUNNING, UNDEFINED) == State.RUNNING)
         assert(
           app.mapYarnState(appId, FINISHED, FinalApplicationStatus.SUCCEEDED) == State.FINISHED)
-        assert(app.mapYarnState(appId, FINISHED, FinalApplicationStatus.FAILED) == State.FAILED)
-        assert(app.mapYarnState(appId, FINISHED, FinalApplicationStatus.KILLED) == State.KILLED)
+        assert(app.mapYarnState(appId, FAILED, FinalApplicationStatus.FAILED) == State.FAILED)
+        assert(app.mapYarnState(appId, KILLED, FinalApplicationStatus.KILLED) == State.KILLED)
+
+        // none of the (state , finalStatus) combination below should happen
         assert(app.mapYarnState(appId, FINISHED, UNDEFINED) == State.FAILED)
+        assert(app.mapYarnState(appId, FINISHED, FinalApplicationStatus.FAILED) == State.FAILED)
+        assert(app.mapYarnState(appId, FINISHED, FinalApplicationStatus.KILLED) == State.FAILED)
         assert(app.mapYarnState(appId, FAILED, UNDEFINED) == State.FAILED)
-        assert(app.mapYarnState(appId, KILLED, UNDEFINED) == State.KILLED)
+        assert(app.mapYarnState(appId, KILLED, UNDEFINED) == State.FAILED)
+        assert(app.mapYarnState(appId, FAILED, FinalApplicationStatus.SUCCEEDED) == State.FAILED)
+        assert(app.mapYarnState(appId, KILLED, FinalApplicationStatus.SUCCEEDED) == State.FAILED)
       }
     }
 
@@ -311,20 +345,30 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
         val mockYarnClient = mock[YarnClient]
         val mockAppReport = mock[ApplicationReport]
         val mockApplicationAttemptId = mock[ApplicationAttemptId]
-        var done = false
+        val done = new AtomicBoolean(false)
 
         when(mockAppReport.getApplicationId).thenReturn(appId)
         when(mockAppReport.getYarnApplicationState).thenAnswer(
           new Answer[YarnApplicationState]() {
             override def answer(invocation: InvocationOnMock): YarnApplicationState = {
-              if (done) {
+              if (done.get()) {
                 FINISHED
               } else {
                 RUNNING
               }
             }
           })
-        when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+        when(mockAppReport.getFinalApplicationStatus).thenAnswer(
+          new Answer[FinalApplicationStatus]() {
+            override def answer(invocation: InvocationOnMock): FinalApplicationStatus = {
+              if (done.get()) {
+                FinalApplicationStatus.SUCCEEDED
+              } else {
+                FinalApplicationStatus.UNDEFINED
+              }
+            }
+          })
+
         when(mockAppReport.getCurrentApplicationAttemptId).thenReturn(mockApplicationAttemptId)
         when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
 
@@ -342,7 +386,7 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
         cleanupThread(app.yarnAppMonitorThread) {
           pollCountDown.await(TEST_TIMEOUT.length, TEST_TIMEOUT.unit)
           assert(app.state == SparkApp.State.RUNNING)
-          done = true
+          done.set(true)
 
           app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
         }