You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2019/12/19 02:49:24 UTC

[incubator-livy] branch master updated: [LIVY-729] Fix livy recover the killed session

This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c656aa  [LIVY-729] Fix livy recover the killed session
1c656aa is described below

commit 1c656aad3495814fb55b5c2f5be708642b6174f3
Author: runzhiwang <ru...@tencent.com>
AuthorDate: Thu Dec 19 10:49:13 2019 +0800

    [LIVY-729] Fix livy recover the killed session
    
    ## What changes were proposed in this pull request?
    
    Follows are steps to reproduce the problem:
    
    1. Set livy.server.recovery.mode=recovery, and create an interactive session: session0 in yarn-cluster
    2. kill the yarn application of the session
    3. restart livy
    4. livy try to recover session0, but application has been killed and driver does not exist, so client can not connect to driver, and exception was thrown as the image.
    5. If the ip:port of the driver was reused by session1, client of session0 will try to connect to driver of session1, then driver will throw exception: Unexpected client ID.
    6. Both the exception threw by livy and driver will confuse the user, and recover a lot of killed sessions will delay the recover of alive session.
    ![image](https://user-images.githubusercontent.com/51938049/71066615-f0216280-21ad-11ea-9559-0cc0bd7d7546.png)
    
    How to fix:
    When session was killed or dead, remove the session from the store
    
    ## How was this patch tested?
    
    Existed IT and UT.
    
    Author: runzhiwang <ru...@tencent.com>
    
    Closes #266 from runzhiwang/remove-node-session-fail.
---
 .../scala/org/apache/livy/server/batch/BatchSession.scala     | 10 ++++++++--
 .../apache/livy/server/interactive/InteractiveSession.scala   | 11 +++++++++++
 .../apache/livy/server/recovery/FileSystemStateStore.scala    |  6 +++++-
 .../org/apache/livy/server/recovery/ZooKeeperStateStore.scala |  2 +-
 4 files changed, 25 insertions(+), 4 deletions(-)

diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
index c94fc04..16f9d4d 100644
--- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
@@ -188,8 +188,14 @@ class BatchSession(
           info(s"Batch session $id created [appid: ${appId.orNull}, state: ${state.toString}, " +
             s"info: ${appInfo.asJavaMap}]")
         case SparkApp.State.FINISHED => _state = SessionState.Success()
-        case SparkApp.State.KILLED => _state = SessionState.Killed()
-        case SparkApp.State.FAILED => _state = SessionState.Dead()
+        case SparkApp.State.KILLED => {
+          _state = SessionState.Killed()
+          sessionStore.remove(RECOVERY_SESSION_TYPE, id)
+        }
+        case SparkApp.State.FAILED => {
+          _state = SessionState.Dead()
+          sessionStore.remove(RECOVERY_SESSION_TYPE, id)
+        }
         case _ =>
       }
     }
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index cdeddda..4b318b8 100644
--- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -577,6 +577,17 @@ class InteractiveSession(
     // Since these 2 transitions are triggered by different threads, there's a race condition.
     // Make sure we won't transit from dead to error state.
     val areSameStates = serverSideState.getClass() == newState.getClass()
+
+    if (!areSameStates) {
+      newState match {
+        case _: SessionState.Killed | _: SessionState.Dead =>
+          sessionStore.remove(RECOVERY_SESSION_TYPE, id)
+        case SessionState.ShuttingDown =>
+          sessionStore.remove(RECOVERY_SESSION_TYPE, id)
+        case _ =>
+      }
+    }
+
     val transitFromInactiveToActive = !serverSideState.isActive && newState.isActive
     if (!areSameStates && !transitFromInactiveToActive) {
       debug(s"$this session state change from ${serverSideState} to $newState")
diff --git a/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala
index d5f8f3d..ff5185b 100644
--- a/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala
+++ b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala
@@ -125,7 +125,11 @@ class FileSystemStateStore(
   }
 
   override def remove(key: String): Unit = {
-    fileContext.delete(absPath(key), false)
+    try {
+      fileContext.delete(absPath(key), false)
+    } catch {
+      case _: FileNotFoundException => warn(s"Failed to remove non-existed file: ${key}")
+    }
   }
 
   private def absPath(key: String): Path = new Path(fsUri.getPath(), key)
diff --git a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala
index ec6b9df..1b93b7a 100644
--- a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala
+++ b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala
@@ -110,7 +110,7 @@ class ZooKeeperStateStore(
     try {
       curatorClient.delete().guaranteed().forPath(prefixKey(key))
     } catch {
-      case _: NoNodeException =>
+      case _: NoNodeException => warn(s"Fail to remove non-existed zookeeper node: ${key}")
     }
   }