You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2017/01/24 05:26:52 UTC

zeppelin git commit: [ZEPPELIN-1984] Capture stdin/stdout on interpreter process creation and propagate to front-end when error

Repository: zeppelin
Updated Branches:
  refs/heads/master 9b4a1bf72 -> 92cebe59d


[ZEPPELIN-1984] Capture stdin/stdout on interpreter process creation and propagate to front-end when error

### What is this PR for?
This PR captures stdin/stdout on interpreter process creation and propagate to front-end when error.

### What type of PR is it?
Improvement

### Todos
* [x] - Capture and propagate stdin/stdout
* [x] - unittest

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1984

### How should this be tested?
Outline the steps to test the PR here.

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no

Author: Lee moon soo <mo...@apache.org>

Closes #1931 from Leemoonsoo/ZEPPELIN-1894 and squashes the following commits:

2e4867b [Lee moon soo] update unittest
947d183 [Lee moon soo] Propagate process creation error


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/92cebe59
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/92cebe59
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/92cebe59

Branch: refs/heads/master
Commit: 92cebe59da573ee0ededd9c69055834d5a7e4fff
Parents: 9b4a1bf
Author: Lee moon soo <mo...@apache.org>
Authored: Sat Jan 21 18:03:40 2017 -0800
Committer: Lee moon soo <mo...@apache.org>
Committed: Mon Jan 23 21:26:10 2017 -0800

----------------------------------------------------------------------
 .../remote/RemoteInterpreterManagedProcess.java | 51 +++++++++++++++++++-
 .../java/org/apache/zeppelin/scheduler/Job.java |  6 ++-
 .../zeppelin/scheduler/RemoteScheduler.java     |  4 ++
 .../remote/RemoteInterpreterProcessTest.java    | 18 +++++++
 .../remote/RemoteInterpreterTest.java           |  4 +-
 5 files changed, 79 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/92cebe59/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index f3d5f91..f5d73ed 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -24,6 +24,7 @@ import org.apache.zeppelin.interpreter.InterpreterException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Map;
@@ -109,7 +110,12 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
     cmdLine.addArgument(localRepoDir, false);
 
     executor = new DefaultExecutor();
-    executor.setStreamHandler(new PumpStreamHandler(new ProcessLogOutputStream(logger)));
+
+    ByteArrayOutputStream cmdOut = new ByteArrayOutputStream();
+    ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger);
+    processOutput.setOutputStream(cmdOut);
+
+    executor.setStreamHandler(new PumpStreamHandler(processOutput));
     watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
     executor.setWatchdog(watchdog);
 
@@ -128,6 +134,15 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
 
     long startTime = System.currentTimeMillis();
     while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
+      if (!running) {
+        try {
+          cmdOut.flush();
+        } catch (IOException e) {
+          // nothing to do
+        }
+        throw new InterpreterException(new String(cmdOut.toByteArray()));
+      }
+
       try {
         if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
           break;
@@ -145,6 +160,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
         }
       }
     }
+    processOutput.setOutputStream(null);
   }
 
   public void stop() {
@@ -179,6 +195,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
   private static class ProcessLogOutputStream extends LogOutputStream {
 
     private Logger logger;
+    OutputStream out;
 
     public ProcessLogOutputStream(Logger logger) {
       this.logger = logger;
@@ -188,5 +205,37 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
     protected void processLine(String s, int i) {
       this.logger.debug(s);
     }
+
+    @Override
+    public void write(byte [] b) throws IOException {
+      super.write(b);
+
+      if (out != null) {
+        synchronized (this) {
+          if (out != null) {
+            out.write(b);
+          }
+        }
+      }
+    }
+
+    @Override
+    public void write(byte [] b, int offset, int len) throws IOException {
+      super.write(b, offset, len);
+
+      if (out != null) {
+        synchronized (this) {
+          if (out != null) {
+            out.write(b, offset, len);
+          }
+        }
+      }
+    }
+
+    public void setOutputStream(OutputStream out) {
+      synchronized (this) {
+        this.out = out;
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/92cebe59/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
index 9bb26f3..a690bef 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
@@ -202,7 +202,11 @@ public abstract class Job {
     }
 
     Throwable cause = ExceptionUtils.getRootCause(e);
-    return ExceptionUtils.getFullStackTrace(cause);
+    if (cause != null) {
+      return ExceptionUtils.getFullStackTrace(cause);
+    } else {
+      return ExceptionUtils.getFullStackTrace(e);
+    }
   }
 
   public Throwable getException() {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/92cebe59/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index 8cd38dc..0101b18 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -346,6 +346,10 @@ public class RemoteScheduler implements Scheduler {
           lastStatus = Status.ERROR;
         }
       }
+      if (job.getException() != null) {
+        lastStatus = Status.ERROR;
+      }
+
       job.setStatus(lastStatus);
 
       if (listener != null) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/92cebe59/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
index 99ab63b..39a17ae 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
@@ -27,6 +27,7 @@ import java.util.Properties;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.zeppelin.interpreter.Constants;
+import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
 import org.junit.Test;
@@ -109,4 +110,21 @@ public class RemoteInterpreterProcessTest {
     assertEquals(1, rip.reference(intpGroup, "anonymous", false));
     assertEquals(true, rip.isRunning());
   }
+
+
+  @Test
+  public void testPropagateError() throws TException, InterruptedException {
+    InterpreterGroup intpGroup = new InterpreterGroup();
+    RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
+        "echo hello_world", "nonexists", "fakeRepo", new HashMap<String, String>(),
+        10 * 1000, null, null);
+    assertFalse(rip.isRunning());
+    assertEquals(0, rip.referenceCount());
+    try {
+      assertEquals(1, rip.reference(intpGroup, "anonymous", false));
+    } catch (InterpreterException e) {
+      e.getMessage().contains("hello_world");
+    }
+    assertEquals(0, rip.referenceCount());
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/92cebe59/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
index 71e5f56..51c18f7 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -831,8 +831,8 @@ public class RemoteInterpreterTest {
 
 
     assertEquals("env value 1", intp.interpret("getEnv MY_ENV1", context).message().get(0).getData());
-    assertEquals(0, intp.interpret("getProperty MY_ENV1", context).message().size());
-    assertEquals(0, intp.interpret("getEnv my.property.1", context).message().size());
+    assertEquals(Code.ERROR, intp.interpret("getProperty MY_ENV1", context).code());
+    assertEquals(Code.ERROR, intp.interpret("getEnv my.property.1", context).code());
     assertEquals("property value 1", intp.interpret("getProperty my.property.1", context).message().get(0).getData());
 
     intp.close();