You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by andrewor14 <gi...@git.apache.org> on 2014/05/01 01:50:25 UTC

[GitHub] spark pull request: [SPARK-1688] PySpark throws unhelpful exceptio...

GitHub user andrewor14 opened a pull request:

    https://github.com/apache/spark/pull/603

    [SPARK-1688] PySpark throws unhelpful exception when pyspark is not found

    Currently, if pyspark cannot be loaded for any reason, Spark throws a random `java.io.EOFException` when trying to read from the python daemon's output. This can be caused by one of the following:
    
    (1) PYTHONPATH is not set
    (2) PYTHONPATH does not contain the python directory (or jar, in the case of YARN)
    (3) The jar does not contain pyspark files (YARN)
    (4) The jar does not contain py4j files (YARN)
    
    This PR distinguishes between these cases and throws a more helpful exception earlier with the appropriate error message.
    
    Tested on both a standalone cluster and a YARN cluster.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/andrewor14/spark pyspark-exception

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/603.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #603
    
----
commit 6c09c2112ed00534573f7f58bd0633d0cb34a65d
Author: Andrew Or <an...@gmail.com>
Date:   2014-04-30T21:26:06Z

    Validate PYTHONPATH and PySpark modules before starting python workers

commit dcc0353f8a806e5e7ad86cd5b167b414252d25ac
Author: Andrew Or <an...@gmail.com>
Date:   2014-04-30T22:53:02Z

    Check both python and system environment variables for PYTHONPATH

commit cdbc18598383d39f3157a6fef882834ea03118d4
Author: Andrew Or <an...@gmail.com>
Date:   2014-04-30T23:15:30Z

    Fix python attribute not found exception when PYTHONPATH is not set

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12388303
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
    @@ -208,6 +170,19 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
         }
       }
     
    +  /**
    +   * Redirect a worker's stdout and stderr to our stderr.
    +   */
    +  private def redirectWorkerStreams(stdout: InputStream, stderr: InputStream) {
    +    try {
    +      new RedirectThread(stdout, System.err, "stdout reader for " + pythonExec).start()
    +      new RedirectThread(stderr, System.err, "stderr reader for " + pythonExec).start()
    +    } catch {
    +      case e: Throwable =>
    --- End diff --
    
    It could. The reason for paranoia is because I don't want both the stderr RedirectThread and the catch block in `startDaemon` to contend for the error stream. But sure, it doesn't have to be a throwable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42346964
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12343426
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
    @@ -156,46 +154,27 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
             workerEnv.putAll(envVars)
             daemon = pb.start()
     
    -        // Redirect the stderr to ours
    -        new Thread("stderr reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val in = daemon.getErrorStream
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    -
             val in = new DataInputStream(daemon.getInputStream)
             daemonPort = in.readInt()
     
    -        // Redirect further stdout output to our stderr
    -        new Thread("stdout reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    +        // Redirect worker stdout and stderr
    +        redirectStream("stdout reader for " + pythonExec, in)
    +        redirectStream("stderr reader for " + pythonExec, in)
    +
           } catch {
    -        case e: Throwable => {
    +        case e: Throwable =>
    +          val stderr = Source.fromInputStream(daemon.getErrorStream).getLines().mkString("\n")
               stopDaemon()
    -          throw e
    -        }
    +
    +          // Append error message from python daemon, but keep original stack trace
    +          val errorMessage =
    +            s"""
    +              |Exception from python worker - "$stderr"
    --- End diff --
    
    This will be a bit awkward if the output has linebreaks in it... it might be good to push put `$stderr` on it's own line. Or even something like this:
    
    ```
    s"""
       |Exception from python worker
       |${stderr.mkString("  ", "\n", "")} 
    ```
    And then you'd not do `mkString` above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] PySpark throws unhelpful exceptio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-41865606
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12390525
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
    @@ -161,46 +131,38 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
             workerEnv.put("PYTHONPATH", pythonPath)
             daemon = pb.start()
     
    -        // Redirect the stderr to ours
    -        new Thread("stderr reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val in = daemon.getErrorStream
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    -
             val in = new DataInputStream(daemon.getInputStream)
             daemonPort = in.readInt()
     
    -        // Redirect further stdout output to our stderr
    -        new Thread("stdout reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    +        // Redirect worker stdout and stderr
    +        redirectWorkerStreams(in, daemon.getErrorStream)
    +
           } catch {
    -        case e: Throwable => {
    +        case e: Throwable =>
    +
    +          // If the daemon exists, wait for it to finish and get its stderr
    +          val stderr = Option(daemon)
    +            .flatMap { d => Utils.getStderr(d, PROCESS_WAIT_TIMEOUT_MS) }
    +            .getOrElse("")
    +
               stopDaemon()
    -          throw e
    -        }
    +
    +          if (stderr != "") {
    +            val formattedStderr = stderr.replace("\n", "\n  ")
    +            val errorMessage = s"""
    +              |Error from python worker:
    +              |  $formattedStderr
    +              |PYTHONPATH was:
    +              |  $pythonPath
    +              |$e"""
    +
    +            // Append error message from python daemon, but keep original stack trace
    --- End diff --
    
    We're not hiding the exception; all we're doing is tacking a message on top of it. Not exactly sure what you mean?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12391570
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
    @@ -161,46 +131,38 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
             workerEnv.put("PYTHONPATH", pythonPath)
             daemon = pb.start()
     
    -        // Redirect the stderr to ours
    -        new Thread("stderr reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val in = daemon.getErrorStream
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    -
             val in = new DataInputStream(daemon.getInputStream)
             daemonPort = in.readInt()
     
    -        // Redirect further stdout output to our stderr
    -        new Thread("stdout reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    +        // Redirect worker stdout and stderr
    +        redirectWorkerStreams(in, daemon.getErrorStream)
    +
           } catch {
    -        case e: Throwable => {
    +        case e: Throwable =>
    --- End diff --
    
    aha, k.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42356729
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42355570
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] PySpark throws unhelpful exceptio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-41867972
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12389493
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
    @@ -161,46 +131,38 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
             workerEnv.put("PYTHONPATH", pythonPath)
             daemon = pb.start()
     
    -        // Redirect the stderr to ours
    -        new Thread("stderr reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val in = daemon.getErrorStream
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    -
             val in = new DataInputStream(daemon.getInputStream)
             daemonPort = in.readInt()
     
    -        // Redirect further stdout output to our stderr
    -        new Thread("stdout reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    +        // Redirect worker stdout and stderr
    +        redirectWorkerStreams(in, daemon.getErrorStream)
    +
           } catch {
    -        case e: Throwable => {
    +        case e: Throwable =>
    +
    +          // If the daemon exists, wait for it to finish and get its stderr
    +          val stderr = Option(daemon)
    +            .flatMap { d => Utils.getStderr(d, PROCESS_WAIT_TIMEOUT_MS) }
    +            .getOrElse("")
    +
               stopDaemon()
    -          throw e
    -        }
    +
    +          if (stderr != "") {
    +            val formattedStderr = stderr.replace("\n", "\n  ")
    +            val errorMessage = s"""
    +              |Error from python worker:
    +              |  $formattedStderr
    +              |PYTHONPATH was:
    +              |  $pythonPath
    +              |$e"""
    +
    +            // Append error message from python daemon, but keep original stack trace
    --- End diff --
    
    Maybe make a comment somewhere that we're assuming any stderr from the daemon has the real error, which is why we're hiding the Exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42378456
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] PySpark throws unhelpful exceptio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42344667
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12358117
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
    @@ -38,7 +41,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
       var daemonPort: Int = 0
     
       val pythonPath = PythonUtils.mergePythonPaths(
    -    PythonUtils.sparkPythonPath, envVars.getOrElse("PYTHONPATH", ""))
    +    PythonUtils.sparkPythonPath,
    +    envVars.getOrElse("PYTHONPATH", ""),
    +    sys.env.getOrElse("PYTHONPATH", ""))
    --- End diff --
    
    This is added to make sure we don't ignore the PYTHONPATH set by the user, if any.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12343735
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
    @@ -156,46 +154,27 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
             workerEnv.putAll(envVars)
             daemon = pb.start()
     
    -        // Redirect the stderr to ours
    -        new Thread("stderr reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val in = daemon.getErrorStream
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    -
             val in = new DataInputStream(daemon.getInputStream)
             daemonPort = in.readInt()
     
    -        // Redirect further stdout output to our stderr
    -        new Thread("stdout reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    +        // Redirect worker stdout and stderr
    +        redirectStream("stdout reader for " + pythonExec, in)
    +        redirectStream("stderr reader for " + pythonExec, in)
    --- End diff --
    
    This says stderr reader, but it redirects the same stream (`in`)... is that a mistake?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] PySpark throws unhelpful exceptio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-41867974
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14593/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12391211
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
    @@ -208,6 +170,19 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
         }
       }
     
    +  /**
    +   * Redirect a worker's stdout and stderr to our stderr.
    +   */
    +  private def redirectWorkerStreams(stdout: InputStream, stderr: InputStream) {
    +    try {
    +      new RedirectThread(stdout, System.err, "stdout reader for " + pythonExec).start()
    +      new RedirectThread(stderr, System.err, "stderr reader for " + pythonExec).start()
    +    } catch {
    +      case e: Throwable =>
    --- End diff --
    
    It can't throw a NPE since that's inside the run()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/603


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42360848
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14732/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12343940
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
    @@ -156,46 +154,27 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
             workerEnv.putAll(envVars)
             daemon = pb.start()
     
    -        // Redirect the stderr to ours
    -        new Thread("stderr reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val in = daemon.getErrorStream
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    -
             val in = new DataInputStream(daemon.getInputStream)
             daemonPort = in.readInt()
     
    -        // Redirect further stdout output to our stderr
    -        new Thread("stdout reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    +        // Redirect worker stdout and stderr
    +        redirectStream("stdout reader for " + pythonExec, in)
    +        redirectStream("stderr reader for " + pythonExec, in)
    +
           } catch {
    -        case e: Throwable => {
    +        case e: Throwable =>
    +          val stderr = Source.fromInputStream(daemon.getErrorStream).getLines().mkString("\n")
    --- End diff --
    
    I'm a bit surprised this stream actually contains anything since (a) the process is likely dead and (b) even if it was buffering some data, our reader thread will already have read this stream to the endpoint.
    
    Did you confirm that this will get the exception content?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12389318
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
    @@ -161,46 +131,38 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
             workerEnv.put("PYTHONPATH", pythonPath)
             daemon = pb.start()
     
    -        // Redirect the stderr to ours
    -        new Thread("stderr reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val in = daemon.getErrorStream
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    -
             val in = new DataInputStream(daemon.getInputStream)
             daemonPort = in.readInt()
     
    -        // Redirect further stdout output to our stderr
    -        new Thread("stdout reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    +        // Redirect worker stdout and stderr
    +        redirectWorkerStreams(in, daemon.getErrorStream)
    +
           } catch {
    -        case e: Throwable => {
    +        case e: Throwable =>
    --- End diff --
    
    Let's catch Exception instead... I mean, we generally want to catch OOMs and Scala break(), but I think we can make an _exception_ here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42355548
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42378451
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42351667
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14729/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42351664
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42461358
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42360845
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42356706
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12387836
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
    @@ -208,6 +170,19 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
         }
       }
     
    +  /**
    +   * Redirect a worker's stdout and stderr to our stderr.
    +   */
    +  private def redirectWorkerStreams(stdout: InputStream, stderr: InputStream) {
    +    try {
    +      new RedirectThread(stdout, System.err, "stdout reader for " + pythonExec).start()
    +      new RedirectThread(stderr, System.err, "stderr reader for " + pythonExec).start()
    +    } catch {
    +      case e: Throwable =>
    --- End diff --
    
    Couldn't this throw a null pointer exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42469138
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14780/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42381667
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12387647
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
    @@ -208,6 +170,19 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
         }
       }
     
    +  /**
    +   * Redirect a worker's stdout and stderr to our stderr.
    +   */
    +  private def redirectWorkerStreams(stdout: InputStream, stderr: InputStream) {
    +    try {
    +      new RedirectThread(stdout, System.err, "stdout reader for " + pythonExec).start()
    +      new RedirectThread(stderr, System.err, "stderr reader for " + pythonExec).start()
    +    } catch {
    +      case e: Throwable =>
    --- End diff --
    
    Instantiating and starting a Thread generally never throw exceptions, but if you're really keen on doing so, catch Exception instead of Throwable and add the exception to the logWarning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42360851
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42360853
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14733/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] PySpark throws unhelpful exceptio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-41865620
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42351662
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42346957
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12389542
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
    @@ -161,46 +131,38 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
             workerEnv.put("PYTHONPATH", pythonPath)
             daemon = pb.start()
     
    -        // Redirect the stderr to ours
    -        new Thread("stderr reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val in = daemon.getErrorStream
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    -
             val in = new DataInputStream(daemon.getInputStream)
             daemonPort = in.readInt()
     
    -        // Redirect further stdout output to our stderr
    -        new Thread("stdout reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    +        // Redirect worker stdout and stderr
    --- End diff --
    
    It's relatively confusing terminology, but I think this is only redirecting the daemon's stdout and stderr. Other places in this PR I think there is a similar issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42461376
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42381668
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14748/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42378929
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12391196
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
    @@ -161,46 +131,38 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
             workerEnv.put("PYTHONPATH", pythonPath)
             daemon = pb.start()
     
    -        // Redirect the stderr to ours
    -        new Thread("stderr reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val in = daemon.getErrorStream
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    -
             val in = new DataInputStream(daemon.getInputStream)
             daemonPort = in.readInt()
     
    -        // Redirect further stdout output to our stderr
    -        new Thread("stdout reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    +        // Redirect worker stdout and stderr
    +        redirectWorkerStreams(in, daemon.getErrorStream)
    +
           } catch {
    -        case e: Throwable => {
    +        case e: Throwable =>
    +
    +          // If the daemon exists, wait for it to finish and get its stderr
    +          val stderr = Option(daemon)
    +            .flatMap { d => Utils.getStderr(d, PROCESS_WAIT_TIMEOUT_MS) }
    +            .getOrElse("")
    +
               stopDaemon()
    -          throw e
    -        }
    +
    +          if (stderr != "") {
    +            val formattedStderr = stderr.replace("\n", "\n  ")
    +            val errorMessage = s"""
    +              |Error from python worker:
    +              |  $formattedStderr
    +              |PYTHONPATH was:
    +              |  $pythonPath
    +              |$e"""
    +
    +            // Append error message from python daemon, but keep original stack trace
    --- End diff --
    
    Oops, didn't see the $e in the errorMessage


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42381670
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14747/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12391785
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
    @@ -208,6 +170,19 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
         }
       }
     
    +  /**
    +   * Redirect a worker's stdout and stderr to our stderr.
    +   */
    +  private def redirectWorkerStreams(stdout: InputStream, stderr: InputStream) {
    +    try {
    +      new RedirectThread(stdout, System.err, "stdout reader for " + pythonExec).start()
    +      new RedirectThread(stderr, System.err, "stderr reader for " + pythonExec).start()
    +    } catch {
    +      case e: Throwable =>
    --- End diff --
    
    I see, the NPE will be contained within the thread, but won't be propagated here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12343652
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
    @@ -156,46 +154,27 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
             workerEnv.putAll(envVars)
             daemon = pb.start()
     
    -        // Redirect the stderr to ours
    -        new Thread("stderr reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val in = daemon.getErrorStream
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    -
             val in = new DataInputStream(daemon.getInputStream)
             daemonPort = in.readInt()
     
    -        // Redirect further stdout output to our stderr
    -        new Thread("stdout reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    +        // Redirect worker stdout and stderr
    +        redirectStream("stdout reader for " + pythonExec, in)
    +        redirectStream("stderr reader for " + pythonExec, in)
    +
           } catch {
    -        case e: Throwable => {
    +        case e: Throwable =>
    +          val stderr = Source.fromInputStream(daemon.getErrorStream).getLines().mkString("\n")
    --- End diff --
    
    Can't daemon be `null` here if there is an exception before it's assigned?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] PySpark throws unhelpful exceptio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42344650
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42381665
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42378923
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12346357
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
    @@ -156,46 +154,27 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
             workerEnv.putAll(envVars)
             daemon = pb.start()
     
    -        // Redirect the stderr to ours
    -        new Thread("stderr reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val in = daemon.getErrorStream
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    -
             val in = new DataInputStream(daemon.getInputStream)
             daemonPort = in.readInt()
     
    -        // Redirect further stdout output to our stderr
    -        new Thread("stdout reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    +        // Redirect worker stdout and stderr
    +        redirectStream("stdout reader for " + pythonExec, in)
    +        redirectStream("stderr reader for " + pythonExec, in)
    +
           } catch {
    -        case e: Throwable => {
    +        case e: Throwable =>
    +          val stderr = Source.fromInputStream(daemon.getErrorStream).getLines().mkString("\n")
    --- End diff --
    
    Yes, we do get the exception content. As discussed offline, this is because we start the redirect thread after the point at which an exception can be thrown. This is not safe, however, because the redirection itself can throw an exception.
    
    The null thing is also a potential source for NPE. This will be addressed in the next commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42351669
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14730/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42469135
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12358073
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala ---
    @@ -40,3 +40,28 @@ private[spark] object PythonUtils {
         paths.filter(_ != "").mkString(File.pathSeparator)
       }
     }
    +
    +
    +/**
    + * A utility class to redirect the child process's stdout or stderr.
    + */
    +private[spark] class RedirectThread(
    --- End diff --
    
    Added in #664 to remove duplicate code. This is moved from PythonRunner.scala.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on the pull request:

    https://github.com/apache/spark/pull/603#issuecomment-42486038
  
    LGTM, merging into master and spark-1.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1688] Propagate PySpark worker stderr t...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/603#discussion_r12344018
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
    @@ -156,46 +154,27 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
             workerEnv.putAll(envVars)
             daemon = pb.start()
     
    -        // Redirect the stderr to ours
    -        new Thread("stderr reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val in = daemon.getErrorStream
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    -
             val in = new DataInputStream(daemon.getInputStream)
             daemonPort = in.readInt()
     
    -        // Redirect further stdout output to our stderr
    -        new Thread("stdout reader for " + pythonExec) {
    -          setDaemon(true)
    -          override def run() {
    -            scala.util.control.Exception.ignoring(classOf[IOException]) {
    -              // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
    -              val buf = new Array[Byte](1024)
    -              var len = in.read(buf)
    -              while (len != -1) {
    -                System.err.write(buf, 0, len)
    -                len = in.read(buf)
    -              }
    -            }
    -          }
    -        }.start()
    +        // Redirect worker stdout and stderr
    +        redirectStream("stdout reader for " + pythonExec, in)
    +        redirectStream("stderr reader for " + pythonExec, in)
    --- End diff --
    
    Good catch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---