You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/04/27 05:10:13 UTC

zeppelin git commit: ZEPPELIN-2445. Display which line of python code raise exception

Repository: zeppelin
Updated Branches:
  refs/heads/master 6eecdecb5 -> 5cd806dc3


ZEPPELIN-2445. Display which line of python code raise exception

### What is this PR for?
When python code raise exception, user can only see the stacktrace, but don't know which line cause it. It is not so convenient especially when you write many lines of code. This PR would display which line raise exception.

Besides that, I also fix another issue that we would have duplicated error message in python3. (See screenshot)

### What type of PR is it?
[Improvement]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-2445

### How should this be tested?
Unit test is added

### Screenshots (if appropriate)
Python 2 Before
![image](https://cloud.githubusercontent.com/assets/164491/25367168/89ce67fa-29a6-11e7-900c-cff2c66a6df4.png)

Python2 After
![image](https://cloud.githubusercontent.com/assets/164491/25366936/65d246d8-29a5-11e7-8ad7-6786252e7913.png)

Python 3 Before
![image](https://cloud.githubusercontent.com/assets/164491/25367181/9d9a1608-29a6-11e7-8a62-5404b5941256.png)

Python3 After
![image](https://cloud.githubusercontent.com/assets/164491/25366943/6f0200c2-29a5-11e7-8fe6-80794e7c7d86.png)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zj...@apache.org>

Closes #2280 from zjffdu/ZEPPELIN-2445 and squashes the following commits:

fa0cfba [Jeff Zhang] ZEPPELIN-2445. Display which line of python code raise exception


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/5cd806dc
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/5cd806dc
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/5cd806dc

Branch: refs/heads/master
Commit: 5cd806dc3290cbcdcfb7db4f160e32077cf2e522
Parents: 6eecdec
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Apr 24 15:55:14 2017 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Thu Apr 27 13:10:06 2017 +0800

----------------------------------------------------------------------
 .../zeppelin/spark/PySparkInterpreter.java      | 32 ++++++------
 .../main/resources/python/zeppelin_pyspark.py   | 51 ++++++++------------
 spark/src/test/resources/log4j.properties       | 47 ++++++++++++++++++
 .../util/InterpreterOutputStream.java           |  2 +-
 .../zeppelin/rest/ZeppelinSparkClusterTest.java | 21 ++++++++
 .../src/test/resources/log4j.properties         |  3 +-
 6 files changed, 109 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5cd806dc/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index bf0a915..56cec94 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -63,7 +63,7 @@ import py4j.GatewayServer;
  *
  */
 public class PySparkInterpreter extends Interpreter implements ExecuteResultHandler {
-  Logger logger = LoggerFactory.getLogger(PySparkInterpreter.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreter.class);
   private GatewayServer gatewayServer;
   private DefaultExecutor executor;
   private int port;
@@ -106,7 +106,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
       throw new InterpreterException(e);
     }
 
-    logger.info("File {} created", scriptPath);
+    LOGGER.info("File {} created", scriptPath);
   }
 
   @Override
@@ -131,7 +131,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
             try {
               urlList.add(f.toURI().toURL());
             } catch (MalformedURLException e) {
-              logger.error("Error", e);
+              LOGGER.error("Error", e);
             }
           }
         }
@@ -148,7 +148,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
             try {
               urlList.add(f.toURI().toURL());
             } catch (MalformedURLException e) {
-              logger.error("Error", e);
+              LOGGER.error("Error", e);
             }
           }
         }
@@ -162,7 +162,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
       Thread.currentThread().setContextClassLoader(newCl);
       createGatewayServerAndStartScript();
     } catch (Exception e) {
-      logger.error("Error", e);
+      LOGGER.error("Error", e);
       throw new InterpreterException(e);
     } finally {
       Thread.currentThread().setContextClassLoader(oldCl);
@@ -217,7 +217,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     cmd.addArgument(Integer.toString(port), false);
     cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false);
     executor = new DefaultExecutor();
-    outputStream = new InterpreterOutputStream(logger);
+    outputStream = new InterpreterOutputStream(LOGGER);
     PipedOutputStream ps = new PipedOutputStream();
     in = null;
     try {
@@ -313,6 +313,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
 
   public void setStatementsFinished(String out, boolean error) {
     synchronized (statementFinishedNotifier) {
+      LOGGER.debug("Setting python statement output: " + out + ", error: " + error);
       statementOutput = out;
       statementError = error;
       statementFinishedNotifier.notify();
@@ -325,12 +326,14 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
   public void onPythonScriptInitialized(long pid) {
     pythonPid = pid;
     synchronized (pythonScriptInitializeNotifier) {
+      LOGGER.debug("onPythonScriptInitialized is called");
       pythonScriptInitialized = true;
       pythonScriptInitializeNotifier.notifyAll();
     }
   }
 
   public void appendOutput(String message) throws IOException {
+    LOGGER.debug("Output from python process: " + message);
     outputStream.getInterpreterOutput().write(message);
   }
 
@@ -358,6 +361,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
         try {
           pythonScriptInitializeNotifier.wait(1000);
         } catch (InterruptedException e) {
+          e.printStackTrace();
         }
       }
     }
@@ -426,10 +430,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
 
   public void interrupt() throws IOException {
     if (pythonPid > -1) {
-      logger.info("Sending SIGINT signal to PID : " + pythonPid);
+      LOGGER.info("Sending SIGINT signal to PID : " + pythonPid);
       Runtime.getRuntime().exec("kill -SIGINT " + pythonPid);
     } else {
-      logger.warn("Non UNIX/Linux system, close the interpreter");
+      LOGGER.warn("Non UNIX/Linux system, close the interpreter");
       close();
     }
   }
@@ -441,7 +445,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     try {
       interrupt();
     } catch (IOException e) {
-      logger.error("Error", e);
+      LOGGER.error("Error", e);
     }
   }
 
@@ -486,13 +490,13 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
         && pythonscriptRunning) {
         try {
           if (System.currentTimeMillis() - startTime > MAX_TIMEOUT_SEC * 1000) {
-            logger.error("pyspark completion didn't have response for {}sec.", MAX_TIMEOUT_SEC);
+            LOGGER.error("pyspark completion didn't have response for {}sec.", MAX_TIMEOUT_SEC);
             break;
           }
           statementFinishedNotifier.wait(1000);
         } catch (InterruptedException e) {
           // not working
-          logger.info("wait drop");
+          LOGGER.info("wait drop");
           return new LinkedList<>();
         }
       }
@@ -527,7 +531,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
       completionScriptText = text.substring(0, cursor);
     }
     catch (Exception e) {
-      logger.error(e.toString());
+      LOGGER.error(e.toString());
       return null;
     }
     completionEndPosition = completionScriptText.length();
@@ -637,12 +641,12 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
   @Override
   public void onProcessComplete(int exitValue) {
     pythonscriptRunning = false;
-    logger.info("python process terminated. exit code " + exitValue);
+    LOGGER.info("python process terminated. exit code " + exitValue);
   }
 
   @Override
   public void onProcessFailed(ExecuteException e) {
     pythonscriptRunning = false;
-    logger.error("python process failed", e);
+    LOGGER.error("python process failed", e);
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5cd806dc/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py
index da4d794..4376888 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -21,15 +21,7 @@ from py4j.java_gateway import java_import, JavaGateway, GatewayClient
 from py4j.protocol import Py4JJavaError
 from pyspark.conf import SparkConf
 from pyspark.context import SparkContext
-from pyspark.rdd import RDD
-from pyspark.files import SparkFiles
-from pyspark.storagelevel import StorageLevel
-from pyspark.accumulators import Accumulator, AccumulatorParam
-from pyspark.broadcast import Broadcast
-from pyspark.serializers import MarshalSerializer, PickleSerializer
-import warnings
 import ast
-import traceback
 import warnings
 
 # for back compatibility
@@ -231,20 +223,13 @@ class PySparkCompletion:
       result = json.dumps(list(filter(lambda x : not re.match("^__.*", x), list(completionList))))
       self.interpreterObject.setStatementsFinished(result, False)
 
-
-output = Logger()
-sys.stdout = output
-sys.stderr = output
-
 client = GatewayClient(port=int(sys.argv[1]))
 sparkVersion = SparkVersion(int(sys.argv[2]))
-
 if sparkVersion.isSpark2():
   from pyspark.sql import SparkSession
 else:
   from pyspark.sql import SchemaRDD
 
-
 if sparkVersion.isAutoConvertEnabled():
   gateway = JavaGateway(client, auto_convert = True)
 else:
@@ -257,6 +242,9 @@ java_import(gateway.jvm, "org.apache.spark.api.python.*")
 java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
 
 intp = gateway.entry_point
+output = Logger()
+sys.stdout = output
+sys.stderr = output
 intp.onPythonScriptInitialized(os.getpid())
 
 jsc = intp.getJavaSparkContext()
@@ -310,7 +298,6 @@ while True :
   try:
     stmts = req.statements().split("\n")
     jobGroup = req.jobGroup()
-    final_code = []
     
     # Get post-execute hooks
     try:
@@ -328,22 +315,11 @@ while True :
       if hook:
         nhooks += 1
 
-    for s in stmts:
-      if s == None:
-        continue
-
-      # skip comment
-      s_stripped = s.strip()
-      if len(s_stripped) == 0 or s_stripped.startswith("#"):
-        continue
-
-      final_code.append(s)
-
-    if final_code:
+    if stmts:
       # use exec mode to compile the statements except the last statement,
       # so that the last statement's evaluation will be printed to stdout
       sc.setJobGroup(jobGroup, "Zeppelin")
-      code = compile('\n'.join(final_code), '<stdin>', 'exec', ast.PyCF_ONLY_AST, 1)
+      code = compile('\n'.join(stmts), '<stdin>', 'exec', ast.PyCF_ONLY_AST, 1)
       to_run_hooks = []
       if (nhooks > 0):
         to_run_hooks = code.body[-nhooks:]
@@ -365,10 +341,23 @@ while True :
           mod = ast.Module([node])
           code = compile(mod, '<stdin>', 'exec')
           exec(code, _zcUserQueryNameSpace)
+
+        intp.setStatementsFinished("", False)
+      except Py4JJavaError:
+        # raise it to outside try except
+        raise
       except:
-        raise Exception(traceback.format_exc())
+        exception = traceback.format_exc()
+        m = re.search("File \"<stdin>\", line (\d+).*", exception)
+        if m:
+          line_no = int(m.group(1))
+          intp.setStatementsFinished(
+            "Fail to execute line {}: {}\n".format(line_no, stmts[line_no - 1]) + exception, True)
+        else:
+          intp.setStatementsFinished(exception, True)
+    else:
+      intp.setStatementsFinished("", False)
 
-    intp.setStatementsFinished("", False)
   except Py4JJavaError:
     excInnerError = traceback.format_exc() # format_tb() does not return the inner exception
     innerErrorStart = excInnerError.find("Py4JJavaError:")

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5cd806dc/spark/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/spark/src/test/resources/log4j.properties b/spark/src/test/resources/log4j.properties
new file mode 100644
index 0000000..b0d1067
--- /dev/null
+++ b/spark/src/test/resources/log4j.properties
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n
+#log4j.appender.stdout.layout.ConversionPattern=
+#%5p [%t] (%F:%L) - %m%n
+#%-4r [%t] %-5p %c %x - %m%n
+#
+
+# Root logger option
+log4j.rootLogger=INFO, stdout
+ 
+#mute some noisy guys
+log4j.logger.org.apache.hadoop.mapred=WARN
+log4j.logger.org.apache.hadoop.hive.ql=WARN
+log4j.logger.org.apache.hadoop.hive.metastore=WARN
+log4j.logger.org.apache.haadoop.hive.service.HiveServer=WARN
+log4j.logger.org.apache.zeppelin.scheduler=WARN
+
+log4j.logger.org.quartz=WARN
+log4j.logger.DataNucleus=WARN
+log4j.logger.DataNucleus.MetaData=ERROR
+log4j.logger.DataNucleus.Datastore=ERROR
+
+# Log all JDBC parameters
+log4j.logger.org.hibernate.type=ALL
+
+log4j.logger.org.apache.zeppelin.interpreter=DEBUG
+log4j.logger.org.apache.zeppelin.spark=DEBUG

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5cd806dc/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/InterpreterOutputStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/InterpreterOutputStream.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/InterpreterOutputStream.java
index 6bdc2db..6f2a0b4 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/InterpreterOutputStream.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/InterpreterOutputStream.java
@@ -28,7 +28,7 @@ import java.io.IOException;
  * Can be used to channel output from interpreters.
  */
 public class InterpreterOutputStream extends LogOutputStream {
-  public static Logger logger;
+  private Logger logger;
   InterpreterOutput interpreterOutput;
   boolean ignoreLeadingNewLinesFromScalaReporter = false;
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5cd806dc/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index ec0e0bd..2714352 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -275,6 +275,27 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
                 waitForFinish(p);
                 assertEquals(Status.FINISHED, p.getStatus());
                 assertEquals("[Row(len=u'3')]\n", p.getResult().message().get(0).getData());
+
+                // test exception
+                p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+                config = p.getConfig();
+                config.put("enabled", true);
+                p.setConfig(config);
+                /**
+                 %pyspark
+                 a=1
+
+                 print(a2)
+                 */
+                p.setText("%pyspark a=1\n\nprint(a2)");
+                p.setAuthenticationInfo(anonymous);
+                note.run(p.getId());
+                waitForFinish(p);
+                assertEquals(Status.ERROR, p.getStatus());
+                assertTrue(p.getResult().message().get(0).getData()
+                    .contains("Fail to execute line 3: print(a2)"));
+                assertTrue(p.getResult().message().get(0).getData()
+                    .contains("name 'a2' is not defined"));
             }
             if (sparkVersion >= 20) {
                 // run SparkSession test

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5cd806dc/zeppelin-server/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/resources/log4j.properties b/zeppelin-server/src/test/resources/log4j.properties
index 041daf0..b0d1067 100644
--- a/zeppelin-server/src/test/resources/log4j.properties
+++ b/zeppelin-server/src/test/resources/log4j.properties
@@ -43,4 +43,5 @@ log4j.logger.DataNucleus.Datastore=ERROR
 # Log all JDBC parameters
 log4j.logger.org.hibernate.type=ALL
 
-log4j.logger.org.apache.zeppelin.interpreter=DEBUG
\ No newline at end of file
+log4j.logger.org.apache.zeppelin.interpreter=DEBUG
+log4j.logger.org.apache.zeppelin.spark=DEBUG