You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by aarondav <gi...@git.apache.org> on 2014/05/05 04:54:51 UTC

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

GitHub user aarondav opened a pull request:

    https://github.com/apache/spark/pull/640

    SPARK-1579: Clean up PythonRDD and avoid swallowing IOExceptions

    This patch includes several cleanups to PythonRDD, focused around fixing [SPARK-1579](https://issues.apache.org/jira/browse/SPARK-1579) cleanly. Listed in order of approximate importance:
    
    - The Python daemon waits for Spark to close the socket before exiting,
      in order to avoid causing spurious IOExceptions in Spark's
      `PythonRDD::WriterThread`.
    - Removes the Python Monitor Thread, which polled for task cancellations
      in order to kill the Python worker. Instead, we do this in the
      onCompleteCallback, since this is guaranteed to be called during
      cancellation.
    - Adds a "completed" variable to TaskContext to avoid the issue noted in
      [SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), where onCompleteCallbacks may be execution-order dependent.
      Along with this, I removed the "context.interrupted = true" flag in
      the onCompleteCallback.
    - Extracts PythonRDD::WriterThread to its own class.
    
    Since this patch provides an alternative solution to [SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), I did test it with
    
    ```
    sc.textFile("latlon.tsv").take(5)
    ```
    
    many times without error.
    
    Additionally, in order to test the unswallowed exceptions, I performed
    
    ```
    sc.textFile("s3n://<big file>").count()
    ```
    
    and cut my internet during execution. Prior to this patch, we got the "stdin writer exited early" message, which was unhelpful. Now, we get the SocketExceptions propagated through Spark to the user and get proper (though unsuccessful) task retries.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aarondav/spark pyspark-io

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/640.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #640
    
----
commit c0c49da3754d664ab1de76ff8e6fc86a4d5d8714
Author: Aaron Davidson <aa...@databricks.com>
Date:   2014-05-05T02:26:03Z

    SPARK-1579: Clean up PythonRDD and avoid swallowing IOExceptions
    
    This patch includes several cleanups to PythonRDD, focused around
    fixing SPARK-1579 cleanly. Listed in order of importance:
    
    - The Python daemon waits for Spark to close the socket before exiting,
      in order to avoid causing spurious IOExceptions in Spark's
      PythonRDD::WriterThread.
    
    - Removes the Python Monitor Thread, which polled for task cancellations
      in order to kill the Python worker. Instead, we do this in the
      onCompleteCallback, since this is guaranteed to be called during
      cancellation.
    
    - Adds a "completed" variable to TaskContext to avoid the issue noted in
      SPARK-1019, where onCompleteCallbacks may be execution-order dependent.
      Along with this, I removed the "context.interrupted = true" flag in
      the onCompleteCallback.
    
    - Extracts PythonRDD::WriterThread to its own class.
    
    Since this patch provides an alternative solution to SPARK-1019, I did
    test it with
    
    ```sc.textFile("latlon.tsv").take(5)```
    
    many times without error.
    
    Additionally, in order to test the unswallowed exceptions, I performed
    
    ```sc.textFile("s3n://<big file>").count()```
    
    and cut my internet. Prior to this patch, we got the "stdin writer
    exited early" message, which is unhelpful. Now, we get the
    SocketExceptions propagated through Spark to the user and get
    proper (but unsuccessful) task retries.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42161131
  
    Oops, this is actually due to a bug in my reimplementation of Python cancellation. onCompleteCallback doesn't get called if the iterator is stuck reading from Python. I can just revert that part of the patch to continue to use a separate thread, or I can add an "onInterruptCallback" to TaskContext. I will look more into it tomorrow morning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42225583
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14668/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42265795
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14702/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42155847
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/640#discussion_r12358578
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
    @@ -17,11 +17,13 @@
     
     package org.apache.spark.scheduler
     
    +import scala.language.existentials
    +
     import java.io._
     import java.util.zip.{GZIPInputStream, GZIPOutputStream}
     
     import scala.collection.mutable.HashMap
    -import scala.language.existentials
    +import scala.util.Try
    --- End diff --
    
    nit: Not used, and you probably didn't mean to move the other import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/640#discussion_r12389302
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---
    @@ -210,10 +126,15 @@ private[spark] class PythonRDD[T: ClassTag](
                   Array.empty[Byte]
               }
             } catch {
    -          case e: Exception if readerException != null =>
    +
    +          case e: Exception if context.interrupted =>
    +            logDebug("Exception thrown after task interruption", e)
    +            throw new TaskKilledException
    +
    +          case e: Exception if writerThread.exception.isDefined =>
                 logError("Python worker exited unexpectedly (crashed)", e)
    -            logError("Python crash may have been caused by prior exception:", readerException)
    -            throw readerException
    +            logError("This may have been caused by a prior exception:", writerThread.exception.get)
    +            throw writerThread.exception.get
     
               case eof: EOFException =>
                 throw new SparkException("Python worker exited unexpectedly (crashed)", eof)
    --- End diff --
    
    Ok, yeah I also did a quick elsewhere: it looks like `IOExceptions` are triggered if the stream is abruptly closed on our side (among other unexpected errors), but `EOFExceptions` are triggered if the stream is closed on their side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42265760
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42266320
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42269223
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/640#discussion_r12358888
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
    @@ -17,11 +17,13 @@
     
     package org.apache.spark.scheduler
     
    +import scala.language.existentials
    +
     import java.io._
     import java.util.zip.{GZIPInputStream, GZIPOutputStream}
     
     import scala.collection.mutable.HashMap
    -import scala.language.existentials
    +import scala.util.Try
    --- End diff --
    
    (talked offline, but just in case anyone else was curious, the order is scala.language.* > java > scala > library > spark. The scala language features are made special because they're intended to change the way the compiler behaves, although it's more of a futureproof thing right now.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42266323
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42159629
  
    Looks like the Python tests may have timed out; do they work locally?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/640#discussion_r12266441
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---
    @@ -224,10 +150,70 @@ private[spark] class PythonRDD[T: ClassTag](
     
           def hasNext = _nextObj.length != 0
         }
    -    stdoutIterator
    +    new InterruptibleIterator(context, stdoutIterator)
       }
     
       val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
    +
    +  /**
    +   * The thread responsible for writing the data from the PythonRDD's parent iterator to the
    +   * Python process.
    +   */
    +  class WriterThread(env: SparkEnv, worker: Socket, split: Partition, context: TaskContext)
    +    extends Thread(s"stdout writer for $pythonExec") {
    +
    +    @volatile private var _exception: Exception = null
    +
    +    /** Contains the exception thrown while writing the parent iterator to the Python process. */
    +    def exception: Option[Exception] = Option(_exception)
    +
    +    /** Terminates the writer thread, ignoring any exceptions that may occur due to cleanup. */
    +    def shutdownOnTaskCompletion() {
    +      assert(context.completed)
    +      this.interrupt()
    +    }
    +
    +    override def run() {
    --- End diff --
    
    The only change I made to this method (other than moving it) was in the exception handler, where I removed the catch for IOException and FileNotFoundException and added a check for context.completed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42371683
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14739/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/640#discussion_r12266433
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---
    @@ -56,122 +56,46 @@ private[spark] class PythonRDD[T: ClassTag](
         val env = SparkEnv.get
         val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
     
    -    // Ensure worker socket is closed on task completion. Closing sockets is idempotent.
    -    context.addOnCompleteCallback(() =>
    +    // Start a thread to feed the process input from our parent's iterator
    +    val writerThread = new WriterThread(env, worker, split, context)
    +
    +    context.addOnCompleteCallback { () =>
    +      writerThread.shutdownOnTaskCompletion()
    +
    +      // Cleanup the worker socket. This will also cause the Python worker to exit.
           try {
             worker.close()
           } catch {
             case e: Exception => logWarning("Failed to close worker socket", e)
           }
    -    )
    -
    -    @volatile var readerException: Exception = null
     
    -    // Start a thread to feed the process input from our parent's iterator
    -    new Thread("stdin writer for " + pythonExec) {
    -      override def run() {
    +      // The python worker must be destroyed in the event of cancellation to ensure it unblocks.
    +      if (context.interrupted) {
             try {
    -          SparkEnv.set(env)
    -          val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
    -          val dataOut = new DataOutputStream(stream)
    -          // Partition index
    -          dataOut.writeInt(split.index)
    -          // sparkFilesDir
    -          PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut)
    -          // Broadcast variables
    -          dataOut.writeInt(broadcastVars.length)
    -          for (broadcast <- broadcastVars) {
    -            dataOut.writeLong(broadcast.id)
    -            dataOut.writeInt(broadcast.value.length)
    -            dataOut.write(broadcast.value)
    -          }
    -          // Python includes (*.zip and *.egg files)
    -          dataOut.writeInt(pythonIncludes.length)
    -          for (include <- pythonIncludes) {
    -            PythonRDD.writeUTF(include, dataOut)
    -          }
    -          dataOut.flush()
    -          // Serialized command:
    -          dataOut.writeInt(command.length)
    -          dataOut.write(command)
    -          // Data values
    -          PythonRDD.writeIteratorToStream(parent.iterator(split, context), dataOut)
    -          dataOut.flush()
    -          worker.shutdownOutput()
    +          logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
    +          env.destroyPythonWorker(pythonExec, envVars.toMap)
             } catch {
    -
    -          case e: java.io.FileNotFoundException =>
    -            readerException = e
    -            Try(worker.shutdownOutput()) // kill Python worker process
    -
    -          case e: IOException =>
    -            // This can happen for legitimate reasons if the Python code stops returning data
    -            // before we are done passing elements through, e.g., for take(). Just log a message to
    -            // say it happened (as it could also be hiding a real IOException from a data source).
    -            logInfo("stdin writer to Python finished early (may not be an error)", e)
    -
    -          case e: Exception =>
    -            // We must avoid throwing exceptions here, because the thread uncaught exception handler
    -            // will kill the whole executor (see Executor).
    -            readerException = e
    -            Try(worker.shutdownOutput()) // kill Python worker process
    +          case e: Exception => logError("Exception when trying to kill worker", e)
             }
           }
    -    }.start()
    -
    -    // Necessary to distinguish between a task that has failed and a task that is finished
    -    @volatile var complete: Boolean = false
    -
    -    // It is necessary to have a monitor thread for python workers if the user cancels with
    -    // interrupts disabled. In that case we will need to explicitly kill the worker, otherwise the
    -    // threads can block indefinitely.
    -    new Thread(s"Worker Monitor for $pythonExec") {
    -      override def run() {
    -        // Kill the worker if it is interrupted or completed
    -        // When a python task completes, the context is always set to interupted
    -        while (!context.interrupted) {
    -          Thread.sleep(2000)
    -        }
    -        if (!complete) {
    -          try {
    -            logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
    -            env.destroyPythonWorker(pythonExec, envVars.toMap)
    -          } catch {
    -            case e: Exception =>
    -              logError("Exception when trying to kill worker", e)
    -          }
    -        }
    -      }
    -    }.start()
    -
    -    /*
    -     * Partial fix for SPARK-1019: Attempts to stop reading the input stream since
    -     * other completion callbacks might invalidate the input. Because interruption
    -     * is not synchronous this still leaves a potential race where the interruption is
    -     * processed only after the stream becomes invalid.
    -     */
    -    context.addOnCompleteCallback{ () =>
    -      complete = true // Indicate that the task has completed successfully
    -      context.interrupted = true
    --- End diff --
    
    @pwendell I removed this line and the comment regarding SPARK-1019 because we now check for task completion in the worker thread before throwing an exception. Please let me know if this fix is insufficient (I'm not really sure why we had to interrupt before?).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42267876
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14700/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by jey <gi...@git.apache.org>.
Github user jey commented on a diff in the pull request:

    https://github.com/apache/spark/pull/640#discussion_r12354383
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---
    @@ -56,122 +56,46 @@ private[spark] class PythonRDD[T: ClassTag](
         val env = SparkEnv.get
         val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
     
    -    // Ensure worker socket is closed on task completion. Closing sockets is idempotent.
    -    context.addOnCompleteCallback(() =>
    +    // Start a thread to feed the process input from our parent's iterator
    +    val writerThread = new WriterThread(env, worker, split, context)
    +
    +    context.addOnCompleteCallback { () =>
    +      writerThread.shutdownOnTaskCompletion()
    +
    +      // Cleanup the worker socket. This will also cause the Python worker to exit.
           try {
             worker.close()
           } catch {
             case e: Exception => logWarning("Failed to close worker socket", e)
           }
    -    )
    -
    -    @volatile var readerException: Exception = null
     
    -    // Start a thread to feed the process input from our parent's iterator
    -    new Thread("stdin writer for " + pythonExec) {
    -      override def run() {
    +      // The python worker must be destroyed in the event of cancellation to ensure it unblocks.
    +      if (context.interrupted) {
             try {
    -          SparkEnv.set(env)
    -          val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
    -          val dataOut = new DataOutputStream(stream)
    -          // Partition index
    -          dataOut.writeInt(split.index)
    -          // sparkFilesDir
    -          PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut)
    -          // Broadcast variables
    -          dataOut.writeInt(broadcastVars.length)
    -          for (broadcast <- broadcastVars) {
    -            dataOut.writeLong(broadcast.id)
    -            dataOut.writeInt(broadcast.value.length)
    -            dataOut.write(broadcast.value)
    -          }
    -          // Python includes (*.zip and *.egg files)
    -          dataOut.writeInt(pythonIncludes.length)
    -          for (include <- pythonIncludes) {
    -            PythonRDD.writeUTF(include, dataOut)
    -          }
    -          dataOut.flush()
    -          // Serialized command:
    -          dataOut.writeInt(command.length)
    -          dataOut.write(command)
    -          // Data values
    -          PythonRDD.writeIteratorToStream(parent.iterator(split, context), dataOut)
    -          dataOut.flush()
    -          worker.shutdownOutput()
    +          logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
    +          env.destroyPythonWorker(pythonExec, envVars.toMap)
             } catch {
    -
    -          case e: java.io.FileNotFoundException =>
    -            readerException = e
    -            Try(worker.shutdownOutput()) // kill Python worker process
    -
    -          case e: IOException =>
    -            // This can happen for legitimate reasons if the Python code stops returning data
    -            // before we are done passing elements through, e.g., for take(). Just log a message to
    -            // say it happened (as it could also be hiding a real IOException from a data source).
    -            logInfo("stdin writer to Python finished early (may not be an error)", e)
    -
    -          case e: Exception =>
    -            // We must avoid throwing exceptions here, because the thread uncaught exception handler
    -            // will kill the whole executor (see Executor).
    -            readerException = e
    -            Try(worker.shutdownOutput()) // kill Python worker process
    +          case e: Exception => logError("Exception when trying to kill worker", e)
             }
           }
    -    }.start()
    -
    -    // Necessary to distinguish between a task that has failed and a task that is finished
    -    @volatile var complete: Boolean = false
    -
    -    // It is necessary to have a monitor thread for python workers if the user cancels with
    -    // interrupts disabled. In that case we will need to explicitly kill the worker, otherwise the
    -    // threads can block indefinitely.
    -    new Thread(s"Worker Monitor for $pythonExec") {
    -      override def run() {
    -        // Kill the worker if it is interrupted or completed
    -        // When a python task completes, the context is always set to interupted
    -        while (!context.interrupted) {
    -          Thread.sleep(2000)
    -        }
    -        if (!complete) {
    -          try {
    -            logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
    -            env.destroyPythonWorker(pythonExec, envVars.toMap)
    -          } catch {
    -            case e: Exception =>
    -              logError("Exception when trying to kill worker", e)
    -          }
    -        }
    -      }
    -    }.start()
    -
    -    /*
    -     * Partial fix for SPARK-1019: Attempts to stop reading the input stream since
    -     * other completion callbacks might invalidate the input. Because interruption
    -     * is not synchronous this still leaves a potential race where the interruption is
    -     * processed only after the stream becomes invalid.
    -     */
    -    context.addOnCompleteCallback{ () =>
    -      complete = true // Indicate that the task has completed successfully
    -      context.interrupted = true
         }
     
    +    writerThread.start()
    +
         // Return an iterator that read lines from the process's stdout
         val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
         val stdoutIterator = new Iterator[Array[Byte]] {
           def next(): Array[Byte] = {
             val obj = _nextObj
             if (hasNext) {
    -          // FIXME: can deadlock if worker is waiting for us to
    --- End diff --
    
    Yep, sounds right to me. Thanks, @aarondav.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42251472
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/640#discussion_r12358676
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
    @@ -17,11 +17,13 @@
     
     package org.apache.spark.scheduler
     
    +import scala.language.existentials
    +
     import java.io._
     import java.util.zip.{GZIPInputStream, GZIPOutputStream}
     
     import scala.collection.mutable.HashMap
    -import scala.language.existentials
    +import scala.util.Try
    --- End diff --
    
    o, so the new order is `scala > java > library > spark`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42264941
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14698/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42264898
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42214672
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42265794
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/640#discussion_r12358598
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
    @@ -17,11 +17,13 @@
     
     package org.apache.spark.scheduler
     
    +import scala.language.existentials
    +
     import java.io._
     import java.util.zip.{GZIPInputStream, GZIPOutputStream}
     
     import scala.collection.mutable.HashMap
    -import scala.language.existentials
    +import scala.util.Try
    --- End diff --
    
    Mm, it was decided (on the dev list) that we'd put Scala language features above all other imports. I guess that hasn't made its way to the code style guide yet...
    
    Removed the Try.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42225580
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42265262
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/640#discussion_r12351428
  
    --- Diff: python/pyspark/daemon.py ---
    @@ -74,6 +74,15 @@ def handle_sigchld(*args):
                     raise
         signal.signal(SIGCHLD, handle_sigchld)
     
    +    # Blocks until the socket is closed by draining the input stream
    +    # until it raises an exception.
    +    def waitSocketClose(sock):
    +        try:
    +            while True:
    --- End diff --
    
    As per our offline discussion this needs to handle the case where the socket shuts down gracefully.
    ```
      if sock.recv(4096) == "":
        return
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by jey <gi...@git.apache.org>.
Github user jey commented on a diff in the pull request:

    https://github.com/apache/spark/pull/640#discussion_r12389180
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---
    @@ -210,10 +126,15 @@ private[spark] class PythonRDD[T: ClassTag](
                   Array.empty[Byte]
               }
             } catch {
    -          case e: Exception if readerException != null =>
    +
    +          case e: Exception if context.interrupted =>
    +            logDebug("Exception thrown after task interruption", e)
    +            throw new TaskKilledException
    +
    +          case e: Exception if writerThread.exception.isDefined =>
                 logError("Python worker exited unexpectedly (crashed)", e)
    -            logError("Python crash may have been caused by prior exception:", readerException)
    -            throw readerException
    +            logError("This may have been caused by a prior exception:", writerThread.exception.get)
    +            throw writerThread.exception.get
     
               case eof: EOFException =>
                 throw new SparkException("Python worker exited unexpectedly (crashed)", eof)
    --- End diff --
    
    I think the intention here was to only handle EOFs because EOFs are considered "expected exceptions", whereas other IOExceptions are "unexpected" and therefore just propagated up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42155846
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42214655
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42265598
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42267875
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42264940
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/640


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42265606
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42251481
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42366716
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42158468
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/640#discussion_r12359121
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---
    @@ -210,10 +126,15 @@ private[spark] class PythonRDD[T: ClassTag](
                   Array.empty[Byte]
               }
             } catch {
    -          case e: Exception if readerException != null =>
    +
    +          case e: Exception if context.interrupted =>
    +            logDebug("Exception thrown after task interruption", e)
    +            throw new TaskKilledException
    +
    +          case e: Exception if writerThread.exception.isDefined =>
                 logError("Python worker exited unexpectedly (crashed)", e)
    -            logError("Python crash may have been caused by prior exception:", readerException)
    -            throw readerException
    +            logError("This may have been caused by a prior exception:", writerThread.exception.get)
    +            throw writerThread.exception.get
     
               case eof: EOFException =>
                 throw new SparkException("Python worker exited unexpectedly (crashed)", eof)
    --- End diff --
    
    Not your code, but looks like this also applies to `IOException`, when the stream is abruptly closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42452187
  
    Okay I did a quick pass and this looks good. I'm going to pull this in. Thanks @aarondav!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42264902
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42267871
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42267873
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14699/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/640#discussion_r12266425
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---
    @@ -56,122 +56,46 @@ private[spark] class PythonRDD[T: ClassTag](
         val env = SparkEnv.get
         val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
     
    -    // Ensure worker socket is closed on task completion. Closing sockets is idempotent.
    -    context.addOnCompleteCallback(() =>
    +    // Start a thread to feed the process input from our parent's iterator
    +    val writerThread = new WriterThread(env, worker, split, context)
    +
    +    context.addOnCompleteCallback { () =>
    +      writerThread.shutdownOnTaskCompletion()
    +
    +      // Cleanup the worker socket. This will also cause the Python worker to exit.
           try {
             worker.close()
           } catch {
             case e: Exception => logWarning("Failed to close worker socket", e)
           }
    -    )
    -
    -    @volatile var readerException: Exception = null
     
    -    // Start a thread to feed the process input from our parent's iterator
    -    new Thread("stdin writer for " + pythonExec) {
    -      override def run() {
    +      // The python worker must be destroyed in the event of cancellation to ensure it unblocks.
    +      if (context.interrupted) {
             try {
    -          SparkEnv.set(env)
    -          val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
    -          val dataOut = new DataOutputStream(stream)
    -          // Partition index
    -          dataOut.writeInt(split.index)
    -          // sparkFilesDir
    -          PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut)
    -          // Broadcast variables
    -          dataOut.writeInt(broadcastVars.length)
    -          for (broadcast <- broadcastVars) {
    -            dataOut.writeLong(broadcast.id)
    -            dataOut.writeInt(broadcast.value.length)
    -            dataOut.write(broadcast.value)
    -          }
    -          // Python includes (*.zip and *.egg files)
    -          dataOut.writeInt(pythonIncludes.length)
    -          for (include <- pythonIncludes) {
    -            PythonRDD.writeUTF(include, dataOut)
    -          }
    -          dataOut.flush()
    -          // Serialized command:
    -          dataOut.writeInt(command.length)
    -          dataOut.write(command)
    -          // Data values
    -          PythonRDD.writeIteratorToStream(parent.iterator(split, context), dataOut)
    -          dataOut.flush()
    -          worker.shutdownOutput()
    +          logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
    +          env.destroyPythonWorker(pythonExec, envVars.toMap)
             } catch {
    -
    -          case e: java.io.FileNotFoundException =>
    -            readerException = e
    -            Try(worker.shutdownOutput()) // kill Python worker process
    -
    -          case e: IOException =>
    -            // This can happen for legitimate reasons if the Python code stops returning data
    -            // before we are done passing elements through, e.g., for take(). Just log a message to
    -            // say it happened (as it could also be hiding a real IOException from a data source).
    -            logInfo("stdin writer to Python finished early (may not be an error)", e)
    -
    -          case e: Exception =>
    -            // We must avoid throwing exceptions here, because the thread uncaught exception handler
    -            // will kill the whole executor (see Executor).
    -            readerException = e
    -            Try(worker.shutdownOutput()) // kill Python worker process
    +          case e: Exception => logError("Exception when trying to kill worker", e)
             }
           }
    -    }.start()
    -
    -    // Necessary to distinguish between a task that has failed and a task that is finished
    -    @volatile var complete: Boolean = false
    -
    -    // It is necessary to have a monitor thread for python workers if the user cancels with
    -    // interrupts disabled. In that case we will need to explicitly kill the worker, otherwise the
    -    // threads can block indefinitely.
    -    new Thread(s"Worker Monitor for $pythonExec") {
    -      override def run() {
    -        // Kill the worker if it is interrupted or completed
    -        // When a python task completes, the context is always set to interupted
    -        while (!context.interrupted) {
    -          Thread.sleep(2000)
    -        }
    -        if (!complete) {
    -          try {
    -            logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
    -            env.destroyPythonWorker(pythonExec, envVars.toMap)
    -          } catch {
    -            case e: Exception =>
    -              logError("Exception when trying to kill worker", e)
    -          }
    -        }
    -      }
    -    }.start()
    -
    -    /*
    -     * Partial fix for SPARK-1019: Attempts to stop reading the input stream since
    -     * other completion callbacks might invalidate the input. Because interruption
    -     * is not synchronous this still leaves a potential race where the interruption is
    -     * processed only after the stream becomes invalid.
    -     */
    -    context.addOnCompleteCallback{ () =>
    -      complete = true // Indicate that the task has completed successfully
    -      context.interrupted = true
         }
     
    +    writerThread.start()
    +
         // Return an iterator that read lines from the process's stdout
         val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
         val stdoutIterator = new Iterator[Array[Byte]] {
           def next(): Array[Byte] = {
             val obj = _nextObj
             if (hasNext) {
    -          // FIXME: can deadlock if worker is waiting for us to
    --- End diff --
    
    @jey I removed this comment because after some effort I could not figure out how a deadlock could occur since the only "response" mechanism is via the stdout writer thread which does not interact with this iterator. Please let me know if this comment is still valid.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42269224
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14703/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42265769
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42366696
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/640#discussion_r12266419
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---
    @@ -56,122 +56,46 @@ private[spark] class PythonRDD[T: ClassTag](
         val env = SparkEnv.get
         val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
     
    -    // Ensure worker socket is closed on task completion. Closing sockets is idempotent.
    -    context.addOnCompleteCallback(() =>
    +    // Start a thread to feed the process input from our parent's iterator
    +    val writerThread = new WriterThread(env, worker, split, context)
    +
    +    context.addOnCompleteCallback { () =>
    +      writerThread.shutdownOnTaskCompletion()
    +
    +      // Cleanup the worker socket. This will also cause the Python worker to exit.
           try {
             worker.close()
           } catch {
             case e: Exception => logWarning("Failed to close worker socket", e)
           }
    -    )
    -
    -    @volatile var readerException: Exception = null
     
    -    // Start a thread to feed the process input from our parent's iterator
    -    new Thread("stdin writer for " + pythonExec) {
    -      override def run() {
    +      // The python worker must be destroyed in the event of cancellation to ensure it unblocks.
    +      if (context.interrupted) {
             try {
    -          SparkEnv.set(env)
    -          val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
    -          val dataOut = new DataOutputStream(stream)
    -          // Partition index
    -          dataOut.writeInt(split.index)
    -          // sparkFilesDir
    -          PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut)
    -          // Broadcast variables
    -          dataOut.writeInt(broadcastVars.length)
    -          for (broadcast <- broadcastVars) {
    -            dataOut.writeLong(broadcast.id)
    -            dataOut.writeInt(broadcast.value.length)
    -            dataOut.write(broadcast.value)
    -          }
    -          // Python includes (*.zip and *.egg files)
    -          dataOut.writeInt(pythonIncludes.length)
    -          for (include <- pythonIncludes) {
    -            PythonRDD.writeUTF(include, dataOut)
    -          }
    -          dataOut.flush()
    -          // Serialized command:
    -          dataOut.writeInt(command.length)
    -          dataOut.write(command)
    -          // Data values
    -          PythonRDD.writeIteratorToStream(parent.iterator(split, context), dataOut)
    -          dataOut.flush()
    -          worker.shutdownOutput()
    +          logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
    +          env.destroyPythonWorker(pythonExec, envVars.toMap)
    --- End diff --
    
    @ahirreddy This logic replaces the monitor thread. Please take a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42371681
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42257147
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42265254
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42257150
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14685/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1579: Clean up PythonRDD and avoid swall...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/640#issuecomment-42158471
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14655/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---