You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2021/10/25 07:23:04 UTC

[zeppelin] branch recover_interpretergroup created (now 8f2cabd)

This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a change to branch recover_interpretergroup
in repository https://gitbox.apache.org/repos/asf/zeppelin.git.


      at 8f2cabd  Correct tests

This branch includes the following new commits:

     new c11f108  Close interpretergroup session if interpreter creation failed
     new 66064de  Restart interpreter process if not running
     new 4c3f8a1  Restart a crashed interpreterprocess
     new 8f2cabd  Correct tests

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[zeppelin] 04/04: Correct tests

Posted by pd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch recover_interpretergroup
in repository https://gitbox.apache.org/repos/asf/zeppelin.git

commit 8f2cabd4325a473ccfd969e82a53cced02064844
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Mon Oct 25 09:22:56 2021 +0200

    Correct tests
---
 .../java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java  | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
index 9246bf5..40abdf4 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
@@ -1092,6 +1092,8 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
               p1.getReturn().message().get(0).getData().contains("No such file or directory"));
 
       // run it again, and get the same error
+      note.run(p.getId(), true);
+      assertEquals(Status.FINISHED, p.getStatus());
       note.run(p1.getId(), true);
       assertEquals(Status.ERROR, p1.getStatus());
       assertTrue("Actual error message: " + p1.getReturn().message().get(0).getData(),

[zeppelin] 02/04: Restart interpreter process if not running

Posted by pd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch recover_interpretergroup
in repository https://gitbox.apache.org/repos/asf/zeppelin.git

commit 66064de17102f378261c3b6f90c257f735f7e887
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Thu Oct 21 15:56:13 2021 +0200

    Restart interpreter process if not running
---
 .../org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java  | 7 +++++++
 .../org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java | 8 ++++----
 2 files changed, 11 insertions(+), 4 deletions(-)

diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
index fb17542..af7e7e8 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
@@ -61,6 +61,12 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
                                                                 Properties properties)
       throws IOException {
     synchronized (interpreterProcessCreationLock) {
+      // Stop the interpreterProcess to cleanup the state if not running
+      if (remoteInterpreterProcess != null && !remoteInterpreterProcess.isRunning()) {
+        LOGGER.info("InterpreterProcess for InterpreterGroup {} is not running. Stop the interpreter process to clean up the state. Error message: {}", getId(), remoteInterpreterProcess.getErrorMessage());
+        remoteInterpreterProcess.stop();
+        remoteInterpreterProcess = null;
+      }
       if (remoteInterpreterProcess == null) {
         LOGGER.info("Create InterpreterProcess for InterpreterGroup: {}", getId());
         remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(id, userName,
@@ -183,6 +189,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
     }
   }
 
+  @Override
   public boolean isEmpty() {
     return this.sessions.isEmpty();
   }
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 16e47c3..439bf0b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -96,7 +96,7 @@ public class RemoteInterpreter extends Interpreter {
   }
 
   public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() throws IOException {
-    if (this.interpreterProcess != null) {
+    if (this.interpreterProcess != null && this.interpreterProcess.isRunning()) {
       return this.interpreterProcess;
     }
     ManagedInterpreterGroup intpGroup = getInterpreterGroup();
@@ -125,9 +125,9 @@ public class RemoteInterpreter extends Interpreter {
               ((RemoteInterpreter) interpreter).internal_create();
             }
           } catch (IOException e) {
-            LOGGER.error("Interpreter creation failed", e);
-            interpreterProcess = null;
-            getInterpreterGroup().close(sessionId);
+//            LOGGER.error("Interpreter creation failed", e);
+//            interpreterProcess = null;
+//            getInterpreterGroup().close(sessionId);
             throw new InterpreterException(e);
           }
         }

[zeppelin] 01/04: Close interpretergroup session if interpreter creation failed

Posted by pd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch recover_interpretergroup
in repository https://gitbox.apache.org/repos/asf/zeppelin.git

commit c11f108c0235fa42362ac4e972090db843796b19
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Tue Oct 19 15:51:50 2021 +0200

    Close interpretergroup session if interpreter creation failed
---
 .../org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java     | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 02f1b62..16e47c3 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -30,7 +30,6 @@ import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.LifecycleManager;
 import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
@@ -126,6 +125,9 @@ public class RemoteInterpreter extends Interpreter {
               ((RemoteInterpreter) interpreter).internal_create();
             }
           } catch (IOException e) {
+            LOGGER.error("Interpreter creation failed", e);
+            interpreterProcess = null;
+            getInterpreterGroup().close(sessionId);
             throw new InterpreterException(e);
           }
         }

[zeppelin] 03/04: Restart a crashed interpreterprocess

Posted by pd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch recover_interpretergroup
in repository https://gitbox.apache.org/repos/asf/zeppelin.git

commit 4c3f8a1cbc685541d334b299bf3642b334948561
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Fri Oct 22 14:52:55 2021 +0200

    Restart a crashed interpreterprocess
---
 .../org/apache/zeppelin/interpreter/InterpreterSetting.java |  8 ++++++++
 .../zeppelin/interpreter/ManagedInterpreterGroup.java       | 13 +++++++------
 .../zeppelin/interpreter/remote/RemoteInterpreter.java      |  2 +-
 3 files changed, 16 insertions(+), 7 deletions(-)

diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index bff9273..06ec121 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -455,6 +455,14 @@ public class InterpreterSetting {
             groupId, executionContext);
         ManagedInterpreterGroup intpGroup = createInterpreterGroup(groupId);
         interpreterGroups.put(groupId, intpGroup);
+      } else {
+        // Check for a crashed interpreter process and restart interpreterGroup in this case
+        ManagedInterpreterGroup interpreterGroup = interpreterGroups.get(groupId);
+        if (interpreterGroup.isInterpreterProcessCrashed()) {
+          interpreterGroup.close();
+          interpreterGroups.remove(interpreterGroup.getId());
+          return getOrCreateInterpreterGroup(executionContext);
+        }
       }
       return interpreterGroups.get(groupId);
     } finally {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
index af7e7e8..1140ec9 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
@@ -57,16 +57,17 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
     return interpreterSetting;
   }
 
+  public boolean isInterpreterProcessCrashed() {
+    if (remoteInterpreterProcess == null) {
+      return false;
+    }
+    return !remoteInterpreterProcess.isRunning();
+  }
+
   public RemoteInterpreterProcess getOrCreateInterpreterProcess(String userName,
                                                                 Properties properties)
       throws IOException {
     synchronized (interpreterProcessCreationLock) {
-      // Stop the interpreterProcess to cleanup the state if not running
-      if (remoteInterpreterProcess != null && !remoteInterpreterProcess.isRunning()) {
-        LOGGER.info("InterpreterProcess for InterpreterGroup {} is not running. Stop the interpreter process to clean up the state. Error message: {}", getId(), remoteInterpreterProcess.getErrorMessage());
-        remoteInterpreterProcess.stop();
-        remoteInterpreterProcess = null;
-      }
       if (remoteInterpreterProcess == null) {
         LOGGER.info("Create InterpreterProcess for InterpreterGroup: {}", getId());
         remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(id, userName,
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 439bf0b..967f4fa 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -96,7 +96,7 @@ public class RemoteInterpreter extends Interpreter {
   }
 
   public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() throws IOException {
-    if (this.interpreterProcess != null && this.interpreterProcess.isRunning()) {
+    if (this.interpreterProcess != null) {
       return this.interpreterProcess;
     }
     ManagedInterpreterGroup intpGroup = getInterpreterGroup();