You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/12/24 01:08:55 UTC

spark git commit: [SPARK-4606] Send EOF to child JVM when there's no more data to read.

Repository: spark
Updated Branches:
  refs/heads/master 3f5f4cc4e -> 7e2deb71c


[SPARK-4606] Send EOF to child JVM when there's no more data to read.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #3460 from vanzin/SPARK-4606 and squashes the following commits:

031207d [Marcelo Vanzin] [SPARK-4606] Send EOF to child JVM when there's no more data to read.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e2deb71
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e2deb71
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e2deb71

Branch: refs/heads/master
Commit: 7e2deb71c4239564631b19c748e95c3d1aa1c77d
Parents: 3f5f4cc
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Tue Dec 23 16:02:59 2014 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Dec 23 16:07:59 2014 -0800

----------------------------------------------------------------------
 .../deploy/SparkSubmitDriverBootstrapper.scala  |  3 ++-
 .../scala/org/apache/spark/util/Utils.scala     | 24 ++++++++++++++------
 2 files changed, 19 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7e2deb71/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
index d2687fa..2eab998 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
@@ -151,7 +151,8 @@ private[spark] object SparkSubmitDriverBootstrapper {
     val isWindows = Utils.isWindows
     val isSubprocess = sys.env.contains("IS_SUBPROCESS")
     if (!isWindows) {
-      val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
+      val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin",
+        propagateEof = true)
       stdinThread.start()
       // Spark submit (JVM) may run as a subprocess, and so this JVM should terminate on
       // broken pipe, signaling that the parent process has exited. This is the case if the

http://git-wip-us.apache.org/repos/asf/spark/blob/7e2deb71/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 8c00f2c..0d771ba 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1847,19 +1847,29 @@ private[spark] object Utils extends Logging {
 /**
  * A utility class to redirect the child process's stdout or stderr.
  */
-private[spark] class RedirectThread(in: InputStream, out: OutputStream, name: String)
+private[spark] class RedirectThread(
+    in: InputStream,
+    out: OutputStream,
+    name: String,
+    propagateEof: Boolean = false)
   extends Thread(name) {
 
   setDaemon(true)
   override def run() {
     scala.util.control.Exception.ignoring(classOf[IOException]) {
       // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
-      val buf = new Array[Byte](1024)
-      var len = in.read(buf)
-      while (len != -1) {
-        out.write(buf, 0, len)
-        out.flush()
-        len = in.read(buf)
+      try {
+        val buf = new Array[Byte](1024)
+        var len = in.read(buf)
+        while (len != -1) {
+          out.write(buf, 0, len)
+          out.flush()
+          len = in.read(buf)
+        }
+      } finally {
+        if (propagateEof) {
+          out.close()
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org