You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/11/07 08:29:39 UTC

incubator-livy git commit: [LIVY-412][SERVER] Reject create session request if too more sessions are creating.

Repository: incubator-livy
Updated Branches:
  refs/heads/master 5e0201f60 -> ef5dccbb0


[LIVY-412][SERVER] Reject create session request if too more sessions are creating.

## What changes were proposed in this pull request?

In our cluster, livy server run with spark yarn cluster mode, when createSession request is too frequently, livyServer will start too more spark-submit child process, it will cause the machine oom.

Reject the create session request when there are too many spark-submit child process.

## How was this patch tested?

Add two testcase, test create interactive and batch session when reach the max creating sessions, the request should failed.

Please review https://livy.incubator.apache.org/community/ before opening a pull request.

Author: 沈洪 <yu...@alipay.com>

Closes #58 from shenh062326/livy-412.


Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/ef5dccbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/ef5dccbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/ef5dccbb

Branch: refs/heads/master
Commit: ef5dccbb0d15797a4bc009b78c88e3b40ff79b7d
Parents: 5e0201f
Author: 沈洪 <yu...@alipay.com>
Authored: Tue Nov 7 16:29:32 2017 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Tue Nov 7 16:29:32 2017 +0800

----------------------------------------------------------------------
 .../org/apache/livy/rsc/ContextLauncher.java    |  3 +++
 .../org/apache/livy/rsc/RSCClientFactory.java   |  6 +++++
 .../main/scala/org/apache/livy/LivyConf.scala   |  2 ++
 .../org/apache/livy/server/SessionServlet.scala | 28 ++++++++++++++------
 .../apache/livy/server/batch/BatchSession.scala | 22 ++++++++++++++-
 .../livy/server/batch/BatchServletSpec.scala    | 27 +++++++++++++++++--
 .../InteractiveSessionServletSpec.scala         | 27 ++++++++++++++++++-
 7 files changed, 103 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/ef5dccbb/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
index ed42e48..f6034a7 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
@@ -374,6 +374,7 @@ class ContextLauncher {
         @Override
         public void run() {
           try {
+            RSCClientFactory.childProcesses().incrementAndGet();
             int exitCode = child.waitFor();
             if (exitCode != 0) {
               LOG.warn("Child process exited with code {}.", exitCode);
@@ -385,6 +386,8 @@ class ContextLauncher {
             child.destroy();
           } catch (Exception e) {
             LOG.warn("Exception while waiting for child process.", e);
+          } finally {
+            RSCClientFactory.childProcesses().decrementAndGet();
           }
         }
       };

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/ef5dccbb/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java b/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java
index c6327e2..d9d56fb 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java
@@ -36,6 +36,12 @@ public final class RSCClientFactory implements LivyClientFactory {
 
   private final AtomicInteger refCount = new AtomicInteger();
   private RpcServer server = null;
+  // interactive session child processes number
+  private static AtomicInteger iscpn = new AtomicInteger();
+
+  public static AtomicInteger childProcesses() {
+    return iscpn;
+  }
 
   /**
    * Creates a local Livy client if the URI has the "rsc" scheme.

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/ef5dccbb/server/src/main/scala/org/apache/livy/LivyConf.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala
index 0cfc8f3..48ea7dd 100644
--- a/server/src/main/scala/org/apache/livy/LivyConf.scala
+++ b/server/src/main/scala/org/apache/livy/LivyConf.scala
@@ -148,6 +148,8 @@ object LivyConf {
   val SESSION_TIMEOUT = Entry("livy.server.session.timeout", "1h")
   // How long a finished session state will be kept in memory
   val SESSION_STATE_RETAIN_TIME = Entry("livy.server.session.state-retain.sec", "600s")
+  // Max creating session in livyServer
+  val SESSION_MAX_CREATION = Entry("livy.server.session.max-creation", 100)
 
   val SPARK_MASTER = "spark.master"
   val SPARK_DEPLOY_MODE = "spark.submit.deployMode"

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/ef5dccbb/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
index 76b5afa..69418a8 100644
--- a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
+++ b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
@@ -24,6 +24,8 @@ import scala.concurrent._
 import scala.concurrent.duration._
 
 import org.apache.livy.{LivyConf, Logging}
+import org.apache.livy.rsc.RSCClientFactory
+import org.apache.livy.server.batch.BatchSession
 import org.apache.livy.sessions.{Session, SessionManager}
 import org.apache.livy.sessions.Session.RecoveryMetadata
 
@@ -38,7 +40,7 @@ object SessionServlet extends Logging
  */
 abstract class SessionServlet[S <: Session, R <: RecoveryMetadata](
     private[livy] val sessionManager: SessionManager[S, R],
-    livyConf: LivyConf,
+    val livyConf: LivyConf,
     accessManager: AccessManager)
   extends JsonServlet
   with ApiVersioningSupport
@@ -117,14 +119,24 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata](
     }
   }
 
+  def tooManySessions(): Boolean = {
+    val totalChildProceses = RSCClientFactory.childProcesses().get() +
+      BatchSession.childProcesses.get()
+    totalChildProceses >= livyConf.getInt(LivyConf.SESSION_MAX_CREATION)
+  }
+
   post("/") {
-    val session = sessionManager.register(createSession(request))
-    // Because it may take some time to establish the session, update the last activity
-    // time before returning the session info to the client.
-    session.recordActivity()
-    Created(clientSessionView(session, request),
-      headers = Map("Location" ->
-        (getRequestPathInfo(request) + url(getSession, "id" -> session.id.toString))))
+    if (tooManySessions) {
+      BadRequest("Rejected, too many sessions are being created!")
+    } else {
+      val session = sessionManager.register(createSession(request))
+      // Because it may take some time to establish the session, update the last activity
+      // time before returning the session info to the client.
+      session.recordActivity()
+      Created(clientSessionView(session, request),
+        headers = Map("Location" ->
+          (getRequestPathInfo(request) + url(getSession, "id" -> session.id.toString))))
+    }
   }
 
   private def getRequestPathInfo(request: HttpServletRequest): String = {

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/ef5dccbb/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
index 2605bf5..29a8d8c 100644
--- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
@@ -18,14 +18,16 @@
 package org.apache.livy.server.batch
 
 import java.lang.ProcessBuilder.Redirect
+import java.util.concurrent.atomic.AtomicInteger
 
 import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
 import scala.util.Random
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties
 
-import org.apache.livy.{LivyConf, Logging}
+import org.apache.livy.{LivyConf, Logging, Utils}
 import org.apache.livy.server.recovery.SessionStore
+import org.apache.livy.server.SessionServlet
 import org.apache.livy.sessions.{Session, SessionState}
 import org.apache.livy.sessions.Session._
 import org.apache.livy.utils.{AppInfo, SparkApp, SparkAppListener, SparkProcessBuilder}
@@ -42,6 +44,12 @@ case class BatchRecoveryMetadata(
 
 object BatchSession extends Logging {
   val RECOVERY_SESSION_TYPE = "batch"
+  // batch session child processes number
+  private val bscpn = new AtomicInteger
+
+  def childProcesses(): AtomicInteger = {
+    bscpn
+  }
 
   def create(
       id: Int,
@@ -85,6 +93,18 @@ object BatchSession extends Logging {
       val file = resolveURIs(Seq(request.file), livyConf)(0)
       val sparkSubmit = builder.start(Some(file), request.args)
 
+      Utils.startDaemonThread(s"batch-session-process-$id") {
+        childProcesses.incrementAndGet()
+        try {
+          sparkSubmit.waitFor() match {
+            case 0 =>
+            case exitCode =>
+              warn(s"spark-submit exited with code $exitCode")
+          }
+        } finally {
+          childProcesses.decrementAndGet()
+        }
+      }
       SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s))
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/ef5dccbb/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala
index 8660448..3402cff 100644
--- a/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala
@@ -28,7 +28,7 @@ import scala.concurrent.duration.Duration
 import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar.mock
 
-import org.apache.livy.Utils
+import org.apache.livy.{LivyConf, Utils}
 import org.apache.livy.server.{AccessManager, BaseSessionServletSpec}
 import org.apache.livy.server.recovery.SessionStore
 import org.apache.livy.sessions.{BatchSessionManager, SessionState}
@@ -146,6 +146,29 @@ class BatchServletSpec extends BaseSessionServletSpec[BatchSession, BatchRecover
       view.appInfo shouldEqual appInfo
       view.log shouldEqual log
     }
-  }
 
+    it("should fail session creation when max session creation is hit") {
+      val createRequest = new CreateBatchRequest()
+      createRequest.file = script.toString
+      createRequest.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path"))
+
+      jpost[Map[String, Any]]("/", createRequest) { data =>
+        header("Location") should equal("/2")
+        data("id") should equal (2)
+
+        val batch = servlet.sessionManager.get(2)
+        batch should be (defined)
+      }
+
+      servlet.livyConf.set(LivyConf.SESSION_MAX_CREATION, 1)
+      jpost[Map[String, Any]]("/", createRequest, SC_BAD_REQUEST) { data => None }
+
+      jdelete[Map[String, Any]]("/2") { data =>
+        data should equal (Map("msg" -> "deleted"))
+
+        val batch = servlet.sessionManager.get(2)
+        batch should not be defined
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/ef5dccbb/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
index e1de22e..d42d78c 100644
--- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
@@ -21,14 +21,16 @@ import java.util.concurrent.atomic.AtomicInteger
 import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
 
 import scala.collection.JavaConverters._
+import scala.concurrent.duration._
 import scala.concurrent.Future
 
 import org.json4s.jackson.Json4sScalaModule
 import org.mockito.Matchers._
-import org.mockito.Mockito._
+import org.mockito.Mockito.when
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.scalatest.Entry
+import org.scalatest.concurrent.Eventually._
 import org.scalatest.mock.MockitoSugar.mock
 
 import org.apache.livy.{ExecuteRequest, LivyConf}
@@ -52,6 +54,8 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
     private var statements = IndexedSeq[Statement]()
 
     override protected def createSession(req: HttpServletRequest): InteractiveSession = {
+      super.createSession(req)
+
       val statementCounter = new AtomicInteger()
 
       val session = mock[InteractiveSession]
@@ -183,4 +187,25 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
     view.log shouldEqual log.asJava
   }
 
+  private def waitSession(): Unit = {
+    eventually(timeout(1 minute), interval(100 millis)) {
+      servlet.tooManySessions should be(true)
+    }
+  }
+
+  it("should failed create session when too many creating session ") {
+    var id = 1
+    jpost[SessionInfo]("/", createRequest(inProcess = false)) { data =>
+      id = data.id
+    }
+
+    servlet.livyConf.set(LivyConf.SESSION_MAX_CREATION, 1)
+
+    waitSession
+    jpost[Map[String, Any]]("/", createRequest(), HttpServletResponse.SC_BAD_REQUEST) { data =>
+      None
+    }
+
+    jdelete[Map[String, Any]](s"/${id}") { _ => }
+  }
 }