You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/09/26 05:25:20 UTC

incubator-livy git commit: [LIVY-406][SERVER] Fix Livy cannot app id in yarn client mode issue

Repository: incubator-livy
Updated Branches:
  refs/heads/master 1bd92b9f4 -> 4a537e24d


[LIVY-406][SERVER] Fix Livy cannot app id in yarn client mode issue

In our `SparkYarnApp` logic, we have to wait for process to exit before to query app id. Since the process is never exited in yarn client mode, so it will block follow-up logic to get app id and other yarn application information.

So here propose to remove this logic, also because now we will query app id ever since app is launched, so we should increase the look-up time to avoid timeout.

Author: jerryshao <ss...@hortonworks.com>

Closes #50 from jerryshao/LIVY-406.


Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/4a537e24
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/4a537e24
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/4a537e24

Branch: refs/heads/master
Commit: 4a537e24d605766f901232760f39b44adae817a2
Parents: 1bd92b9
Author: jerryshao <ss...@hortonworks.com>
Authored: Tue Sep 26 13:25:11 2017 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Tue Sep 26 13:25:11 2017 +0800

----------------------------------------------------------------------
 conf/livy.conf.template                         |  4 +--
 .../main/scala/org/apache/livy/LivyConf.scala   |  2 +-
 .../org/apache/livy/utils/SparkYarnApp.scala    |  9 ------
 .../apache/livy/utils/SparkYarnAppSpec.scala    | 30 ++++++++++++++++++++
 4 files changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/4a537e24/conf/livy.conf.template
----------------------------------------------------------------------
diff --git a/conf/livy.conf.template b/conf/livy.conf.template
index 2425059..86ca9ab 100644
--- a/conf/livy.conf.template
+++ b/conf/livy.conf.template
@@ -107,7 +107,7 @@
 # livy.server.recovery.state-store.url =
 
 # If Livy can't find the yarn app within this time, consider it lost.
-# livy.server.yarn.app-lookup-timeout = 60s
+# livy.server.yarn.app-lookup-timeout = 120s
 # When the cluster is busy, we may fail to launch yarn app in app-lookup-timeout, then it would
 # cause session leakage, so we need to check session leakage.
 # How long to check livy session leakage
@@ -116,7 +116,7 @@
 # livy.server.yarn.app-leakage.check-interval = 60s
 
 # How often Livy polls YARN to refresh YARN app state.
-# livy.server.yarn.poll-interval = 1s
+# livy.server.yarn.poll-interval = 5s
 #
 # Days to keep Livy server request logs.
 # livy.server.request-log-retain.days = 5

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/4a537e24/server/src/main/scala/org/apache/livy/LivyConf.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala
index 46af8e8..0cfc8f3 100644
--- a/server/src/main/scala/org/apache/livy/LivyConf.scala
+++ b/server/src/main/scala/org/apache/livy/LivyConf.scala
@@ -123,7 +123,7 @@ object LivyConf {
   val SPARK_LOGS_SIZE = Entry("livy.cache-log.size", 200)
 
   // If Livy can't find the yarn app within this time, consider it lost.
-  val YARN_APP_LOOKUP_TIMEOUT = Entry("livy.server.yarn.app-lookup-timeout", "60s")
+  val YARN_APP_LOOKUP_TIMEOUT = Entry("livy.server.yarn.app-lookup-timeout", "120s")
 
   // How often Livy polls YARN to refresh YARN app state.
   val YARN_POLL_INTERVAL = Entry("livy.server.yarn.poll-interval", "5s")

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/4a537e24/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
index 804be54..91c70ca 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
@@ -231,15 +231,6 @@ class SparkYarnApp private[utils] (
   // batch YARN queries.
   private[utils] val yarnAppMonitorThread = Utils.startDaemonThread(s"yarnAppMonitorThread-$this") {
     try {
-      // Wait for spark-submit to finish submitting the app to YARN.
-      process.foreach { p =>
-        val exitCode = p.waitFor()
-        if (exitCode != 0) {
-          throw new Exception(s"spark-submit exited with code $exitCode}.\n" +
-            s"${process.get.inputLines.mkString("\n")}")
-        }
-      }
-
       // If appId is not known, query YARN by appTag to get it.
       val appId = try {
         appIdOption.map(ConverterUtils.toApplicationId).getOrElse {

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/4a537e24/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
index b747a9a..e0aa0ae 100644
--- a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
+++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
@@ -19,6 +19,7 @@ package org.apache.livy.utils
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 
+import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
@@ -252,6 +253,35 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
       }
     }
 
+    it("should get App Id") {
+      Clock.withSleepMethod(mockSleep) {
+        val mockYarnClient = mock[YarnClient]
+        val mockAppReport = mock[ApplicationReport]
+
+        when(mockAppReport.getApplicationTags).thenReturn(Set(appTag.toLowerCase).asJava)
+        when(mockAppReport.getApplicationId).thenReturn(appId)
+        when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
+        when(mockAppReport.getYarnApplicationState).thenReturn(RUNNING)
+        when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
+        when(mockYarnClient.getApplications(Set("SPARK").asJava))
+          .thenReturn(List(mockAppReport).asJava)
+
+        val mockListener = mock[SparkAppListener]
+        val mockSparkSubmit = mock[LineBufferedProcess]
+        val app = new SparkYarnApp(
+          appTag, None, Some(mockSparkSubmit), Some(mockListener), livyConf, mockYarnClient)
+
+        cleanupThread(app.yarnAppMonitorThread) {
+          app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
+          assert(!app.yarnAppMonitorThread.isAlive,
+            "YarnAppMonitorThread should terminate after YARN app is finished.")
+
+          verify(mockYarnClient, atLeast(1)).getApplicationReport(appId)
+          verify(mockListener).appIdKnown(appId.toString)
+        }
+      }
+    }
+
     it("should expose driver log url and Spark UI url") {
       Clock.withSleepMethod(mockSleep) {
         val mockYarnClient = mock[YarnClient]