You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2017/01/24 05:26:52 UTC
zeppelin git commit: [ZEPPELIN-1984] Capture stdin/stdout on
interpreter process creation and propagate to front-end when error
Repository: zeppelin
Updated Branches:
refs/heads/master 9b4a1bf72 -> 92cebe59d
[ZEPPELIN-1984] Capture stdin/stdout on interpreter process creation and propagate to front-end when error
### What is this PR for?
This PR captures stdin/stdout on interpreter process creation and propagate to front-end when error.
### What type of PR is it?
Improvement
### Todos
* [x] - Capture and propagate stdin/stdout
* [x] - unittest
### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1984
### How should this be tested?
Outline the steps to test the PR here.
### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no
Author: Lee moon soo <mo...@apache.org>
Closes #1931 from Leemoonsoo/ZEPPELIN-1894 and squashes the following commits:
2e4867b [Lee moon soo] update unittest
947d183 [Lee moon soo] Propagate process creation error
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/92cebe59
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/92cebe59
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/92cebe59
Branch: refs/heads/master
Commit: 92cebe59da573ee0ededd9c69055834d5a7e4fff
Parents: 9b4a1bf
Author: Lee moon soo <mo...@apache.org>
Authored: Sat Jan 21 18:03:40 2017 -0800
Committer: Lee moon soo <mo...@apache.org>
Committed: Mon Jan 23 21:26:10 2017 -0800
----------------------------------------------------------------------
.../remote/RemoteInterpreterManagedProcess.java | 51 +++++++++++++++++++-
.../java/org/apache/zeppelin/scheduler/Job.java | 6 ++-
.../zeppelin/scheduler/RemoteScheduler.java | 4 ++
.../remote/RemoteInterpreterProcessTest.java | 18 +++++++
.../remote/RemoteInterpreterTest.java | 4 +-
5 files changed, 79 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/92cebe59/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index f3d5f91..f5d73ed 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -24,6 +24,7 @@ import org.apache.zeppelin.interpreter.InterpreterException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
@@ -109,7 +110,12 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
cmdLine.addArgument(localRepoDir, false);
executor = new DefaultExecutor();
- executor.setStreamHandler(new PumpStreamHandler(new ProcessLogOutputStream(logger)));
+
+ ByteArrayOutputStream cmdOut = new ByteArrayOutputStream();
+ ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger);
+ processOutput.setOutputStream(cmdOut);
+
+ executor.setStreamHandler(new PumpStreamHandler(processOutput));
watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
executor.setWatchdog(watchdog);
@@ -128,6 +134,15 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
+ if (!running) {
+ try {
+ cmdOut.flush();
+ } catch (IOException e) {
+ // nothing to do
+ }
+ throw new InterpreterException(new String(cmdOut.toByteArray()));
+ }
+
try {
if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
break;
@@ -145,6 +160,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
}
}
}
+ processOutput.setOutputStream(null);
}
public void stop() {
@@ -179,6 +195,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
private static class ProcessLogOutputStream extends LogOutputStream {
private Logger logger;
+ OutputStream out;
public ProcessLogOutputStream(Logger logger) {
this.logger = logger;
@@ -188,5 +205,37 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
protected void processLine(String s, int i) {
this.logger.debug(s);
}
+
+ @Override
+ public void write(byte [] b) throws IOException {
+ super.write(b);
+
+ if (out != null) {
+ synchronized (this) {
+ if (out != null) {
+ out.write(b);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void write(byte [] b, int offset, int len) throws IOException {
+ super.write(b, offset, len);
+
+ if (out != null) {
+ synchronized (this) {
+ if (out != null) {
+ out.write(b, offset, len);
+ }
+ }
+ }
+ }
+
+ public void setOutputStream(OutputStream out) {
+ synchronized (this) {
+ this.out = out;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/92cebe59/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
index 9bb26f3..a690bef 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
@@ -202,7 +202,11 @@ public abstract class Job {
}
Throwable cause = ExceptionUtils.getRootCause(e);
- return ExceptionUtils.getFullStackTrace(cause);
+ if (cause != null) {
+ return ExceptionUtils.getFullStackTrace(cause);
+ } else {
+ return ExceptionUtils.getFullStackTrace(e);
+ }
}
public Throwable getException() {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/92cebe59/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index 8cd38dc..0101b18 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -346,6 +346,10 @@ public class RemoteScheduler implements Scheduler {
lastStatus = Status.ERROR;
}
}
+ if (job.getException() != null) {
+ lastStatus = Status.ERROR;
+ }
+
job.setStatus(lastStatus);
if (listener != null) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/92cebe59/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
index 99ab63b..39a17ae 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
@@ -27,6 +27,7 @@ import java.util.Properties;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.interpreter.Constants;
+import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.junit.Test;
@@ -109,4 +110,21 @@ public class RemoteInterpreterProcessTest {
assertEquals(1, rip.reference(intpGroup, "anonymous", false));
assertEquals(true, rip.isRunning());
}
+
+
+ @Test
+ public void testPropagateError() throws TException, InterruptedException {
+ InterpreterGroup intpGroup = new InterpreterGroup();
+ RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
+ "echo hello_world", "nonexists", "fakeRepo", new HashMap<String, String>(),
+ 10 * 1000, null, null);
+ assertFalse(rip.isRunning());
+ assertEquals(0, rip.referenceCount());
+ try {
+ assertEquals(1, rip.reference(intpGroup, "anonymous", false));
+ } catch (InterpreterException e) {
+ e.getMessage().contains("hello_world");
+ }
+ assertEquals(0, rip.referenceCount());
+ }
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/92cebe59/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
index 71e5f56..51c18f7 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -831,8 +831,8 @@ public class RemoteInterpreterTest {
assertEquals("env value 1", intp.interpret("getEnv MY_ENV1", context).message().get(0).getData());
- assertEquals(0, intp.interpret("getProperty MY_ENV1", context).message().size());
- assertEquals(0, intp.interpret("getEnv my.property.1", context).message().size());
+ assertEquals(Code.ERROR, intp.interpret("getProperty MY_ENV1", context).code());
+ assertEquals(Code.ERROR, intp.interpret("getEnv my.property.1", context).code());
assertEquals("property value 1", intp.interpret("getProperty my.property.1", context).message().get(0).getData());
intp.close();