You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/09/18 08:36:14 UTC
incubator-livy git commit: [LIVY-396] Livy does not map YARN app
states correctly
Repository: incubator-livy
Updated Branches:
refs/heads/master 52c89a9c4 -> 219fdac5c
[LIVY-396] Livy does not map YARN app states correctly
Make sure that the invalid combinations of `state` and `finalStatus` do
not occur, and if they do, put the application in the `FAILED` state.
Task-Url: https://issues.apache.org/jira/browse/LIVY-396
Author: Fathi Salmi, Meisam(mfathisalmi) <mf...@paypal.com>
Closes #39 from meisam/LIVY-396.
Change-Id: I839ef9f617cb08463f37eff803dbb99d681f852a
Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/219fdac5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/219fdac5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/219fdac5
Branch: refs/heads/master
Commit: 219fdac5cfea1780cfb4e52f29777d3b21f8a55e
Parents: 52c89a9
Author: Fathi Salmi, Meisam(mfathisalmi) <mf...@paypal.com>
Authored: Mon Sep 18 16:36:03 2017 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Mon Sep 18 16:36:03 2017 +0800
----------------------------------------------------------------------
.../org/apache/livy/utils/SparkYarnApp.scala | 34 ++++-----
.../apache/livy/utils/SparkYarnAppSpec.scala | 78 +++++++++++++++-----
2 files changed, 78 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/219fdac5/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
index 0ab6c44..804be54 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
@@ -206,23 +206,23 @@ class SparkYarnApp private[utils] (
appId: ApplicationId,
yarnAppState: YarnApplicationState,
finalAppStatus: FinalApplicationStatus): SparkApp.State.Value = {
- yarnAppState match {
- case (YarnApplicationState.NEW |
- YarnApplicationState.NEW_SAVING |
- YarnApplicationState.SUBMITTED |
- YarnApplicationState.ACCEPTED) => SparkApp.State.STARTING
- case YarnApplicationState.RUNNING => SparkApp.State.RUNNING
- case YarnApplicationState.FINISHED =>
- finalAppStatus match {
- case FinalApplicationStatus.SUCCEEDED => SparkApp.State.FINISHED
- case FinalApplicationStatus.FAILED => SparkApp.State.FAILED
- case FinalApplicationStatus.KILLED => SparkApp.State.KILLED
- case s =>
- error(s"Unknown YARN final status $appId $s")
- SparkApp.State.FAILED
- }
- case YarnApplicationState.FAILED => SparkApp.State.FAILED
- case YarnApplicationState.KILLED => SparkApp.State.KILLED
+ (yarnAppState, finalAppStatus) match {
+ case (YarnApplicationState.NEW, FinalApplicationStatus.UNDEFINED) |
+ (YarnApplicationState.NEW_SAVING, FinalApplicationStatus.UNDEFINED) |
+ (YarnApplicationState.SUBMITTED, FinalApplicationStatus.UNDEFINED) |
+ (YarnApplicationState.ACCEPTED, FinalApplicationStatus.UNDEFINED) =>
+ SparkApp.State.STARTING
+ case (YarnApplicationState.RUNNING, FinalApplicationStatus.UNDEFINED) =>
+ SparkApp.State.RUNNING
+ case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) =>
+ SparkApp.State.FINISHED
+ case (YarnApplicationState.FAILED, FinalApplicationStatus.FAILED) =>
+ SparkApp.State.FAILED
+ case (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED) =>
+ SparkApp.State.KILLED
+ case (state, finalStatus) => // any other combination is invalid, so FAIL the application.
+ error(s"Unknown YARN state $state for app $appId with final status $finalStatus.")
+ SparkApp.State.FAILED
}
}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/219fdac5/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
index b3c50da..b747a9a 100644
--- a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
+++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
@@ -17,6 +17,7 @@
package org.apache.livy.utils
import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -60,19 +61,46 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
val mockAppReport = mock[ApplicationReport]
when(mockAppReport.getApplicationId).thenReturn(appId)
- when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
- // Simulate YARN app state progression.
- when(mockAppReport.getYarnApplicationState).thenAnswer(new Answer[YarnApplicationState]() {
- private var stateSeq = List(ACCEPTED, RUNNING, FINISHED)
- override def answer(invocation: InvocationOnMock): YarnApplicationState = {
- val currentState = stateSeq.head
- if (stateSeq.tail.nonEmpty) {
- stateSeq = stateSeq.tail
+ // Simulate YARN app state progression.
+ val applicationStateList = List(
+ ACCEPTED,
+ RUNNING,
+ FINISHED
+ )
+ val finalApplicationStatusList = List(
+ FinalApplicationStatus.UNDEFINED,
+ FinalApplicationStatus.UNDEFINED,
+ FinalApplicationStatus.SUCCEEDED
+ )
+ val stateIndex = new AtomicInteger(-1)
+ when(mockAppReport.getYarnApplicationState).thenAnswer(
+ // get and increment
+ new Answer[YarnApplicationState] {
+ override def answer(invocationOnMock: InvocationOnMock): YarnApplicationState = {
+ stateIndex.incrementAndGet match {
+ case i if i < applicationStateList.size =>
+ applicationStateList(i)
+ case _ =>
+ applicationStateList.last
+ }
}
- currentState
}
- })
+ )
+ when(mockAppReport.getFinalApplicationStatus).thenAnswer(
+ new Answer[FinalApplicationStatus] {
+ override def answer(invocationOnMock: InvocationOnMock): FinalApplicationStatus = {
+ // do not increment here, only get
+ stateIndex.get match {
+ case i if i < applicationStateList.size =>
+ finalApplicationStatusList(i)
+ case _ =>
+ finalApplicationStatusList.last
+ }
+ }
+ }
+ )
+
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
val app = new SparkYarnApp(
@@ -210,11 +238,17 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
assert(app.mapYarnState(appId, RUNNING, UNDEFINED) == State.RUNNING)
assert(
app.mapYarnState(appId, FINISHED, FinalApplicationStatus.SUCCEEDED) == State.FINISHED)
- assert(app.mapYarnState(appId, FINISHED, FinalApplicationStatus.FAILED) == State.FAILED)
- assert(app.mapYarnState(appId, FINISHED, FinalApplicationStatus.KILLED) == State.KILLED)
+ assert(app.mapYarnState(appId, FAILED, FinalApplicationStatus.FAILED) == State.FAILED)
+ assert(app.mapYarnState(appId, KILLED, FinalApplicationStatus.KILLED) == State.KILLED)
+
+ // none of the (state , finalStatus) combination below should happen
assert(app.mapYarnState(appId, FINISHED, UNDEFINED) == State.FAILED)
+ assert(app.mapYarnState(appId, FINISHED, FinalApplicationStatus.FAILED) == State.FAILED)
+ assert(app.mapYarnState(appId, FINISHED, FinalApplicationStatus.KILLED) == State.FAILED)
assert(app.mapYarnState(appId, FAILED, UNDEFINED) == State.FAILED)
- assert(app.mapYarnState(appId, KILLED, UNDEFINED) == State.KILLED)
+ assert(app.mapYarnState(appId, KILLED, UNDEFINED) == State.FAILED)
+ assert(app.mapYarnState(appId, FAILED, FinalApplicationStatus.SUCCEEDED) == State.FAILED)
+ assert(app.mapYarnState(appId, KILLED, FinalApplicationStatus.SUCCEEDED) == State.FAILED)
}
}
@@ -311,20 +345,30 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
val mockYarnClient = mock[YarnClient]
val mockAppReport = mock[ApplicationReport]
val mockApplicationAttemptId = mock[ApplicationAttemptId]
- var done = false
+ val done = new AtomicBoolean(false)
when(mockAppReport.getApplicationId).thenReturn(appId)
when(mockAppReport.getYarnApplicationState).thenAnswer(
new Answer[YarnApplicationState]() {
override def answer(invocation: InvocationOnMock): YarnApplicationState = {
- if (done) {
+ if (done.get()) {
FINISHED
} else {
RUNNING
}
}
})
- when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+ when(mockAppReport.getFinalApplicationStatus).thenAnswer(
+ new Answer[FinalApplicationStatus]() {
+ override def answer(invocation: InvocationOnMock): FinalApplicationStatus = {
+ if (done.get()) {
+ FinalApplicationStatus.SUCCEEDED
+ } else {
+ FinalApplicationStatus.UNDEFINED
+ }
+ }
+ })
+
when(mockAppReport.getCurrentApplicationAttemptId).thenReturn(mockApplicationAttemptId)
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
@@ -342,7 +386,7 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
cleanupThread(app.yarnAppMonitorThread) {
pollCountDown.await(TEST_TIMEOUT.length, TEST_TIMEOUT.unit)
assert(app.state == SparkApp.State.RUNNING)
- done = true
+ done.set(true)
app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
}